Private
Public Access
1
0
Files
u-desk/internal/oss/qiniu/multipart_v2.go

428 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package qiniu
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"u-desk/internal/oss"
)
// PartInfo 分片信息
type PartInfo struct {
PartNumber int `json:"partNumber"` // 分片编号 (1-10000)
ETag string `json:"etag"` // 分片的 ETag
}
// InitiateMultipartUploadResult 初始化分片上传任务的结果
type InitiateMultipartUploadResult struct {
UploadId string `json:"uploadId"` // 上传任务 ID
}
// UploadPartResult 上传分片的结果
type UploadPartResult struct {
ETag string `json:"etag"` // 分片的 ETag
MD5 string `json:"md5"` // 分片的 MD5
}
// CompleteMultipartUploadResult 完成分片上传的结果
type CompleteMultipartUploadResult struct {
Key string `json:"key"` // 文件 key
Hash string `json:"hash"` // 文件 hash (ETag)
}
// InitiateMultipartUpload 初始化分片上传任务
// 根据: https://developer.qiniu.com/kodo/api/1502/initiate-multipart-upload
func (c *Client) InitiateMultipartUpload(ctx context.Context, key string) (string, error) {
// 生成上传 token
// 注意:分片上传 v2 需要 bucket 级别的 token不包含 key
token := c.generateBucketToken()
// 构建 URL
// 格式: POST /buckets/<BucketName>/objects/<EncodedObjectName>/uploads
encodedKey := base64.URLEncoding.EncodeToString([]byte(key))
url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads",
c.getUploadDomain(), c.config.Bucket, encodedKey)
// 构建请求体
requestBody := map[string]string{
"fname": key,
}
bodyBytes, _ := json.Marshal(requestBody)
// 创建请求
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bodyBytes))
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to create request", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "UpToken "+token)
// 发送请求
resp, err := c.httpClient.Do(req)
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to initiate multipart upload", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to read response", err)
}
if resp.StatusCode != 200 {
return "", oss.NewError("MULTIPART_ERROR",
fmt.Sprintf("initiate multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil)
}
// 解析响应
var result InitiateMultipartUploadResult
if err := json.Unmarshal(respBody, &result); err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to parse response", err)
}
return result.UploadId, nil
}
// UploadPart 上传分片
// 根据: https://developer.qiniu.com/kodo/api/6366/upload-part
func (c *Client) UploadPart(ctx context.Context, key, uploadId string, partNumber int, reader io.Reader) (string, error) {
// 生成上传 token分片上传 v2 使用 bucket 级别 token
token := c.generateBucketToken()
// 读取数据
data, err := io.ReadAll(reader)
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to read part data", err)
}
// 计算 MD5
hash := md5.New()
hash.Write(data)
md5Sum := hash.Sum(nil)
md5Base64 := base64.StdEncoding.EncodeToString(md5Sum)
// 构建 URL
// 格式: PUT /buckets/<BucketName>/objects/<EncodedObjectName>/uploads/<UploadId>/<PartNumber>
encodedKey := base64.URLEncoding.EncodeToString([]byte(key))
url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s/%d",
c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId, partNumber)
// 创建请求
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(data))
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to create request", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-MD5", md5Base64)
req.Header.Set("Authorization", "UpToken "+token)
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data)))
// 发送请求
resp, err := c.httpClient.Do(req)
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to upload part", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to read response", err)
}
if resp.StatusCode != 200 {
return "", oss.NewError("MULTIPART_ERROR",
fmt.Sprintf("upload part failed with status %d: %s", resp.StatusCode, string(respBody)), nil)
}
// 解析响应
var result UploadPartResult
if err := json.Unmarshal(respBody, &result); err != nil {
return "", oss.NewError("MULTIPART_ERROR", "failed to parse response", err)
}
return result.ETag, nil
}
// CompleteMultipartUpload 完成分片上传
// 根据: https://developer.qiniu.com/kodo/api/6368/complete-multipart-upload
func (c *Client) CompleteMultipartUpload(ctx context.Context, key, uploadId string, parts []PartInfo) (*oss.UploadResult, error) {
// 生成上传 token分片上传 v2 使用 bucket 级别 token
token := c.generateBucketToken()
// 构建 URL
// 格式: POST /buckets/<BucketName>/objects/<EncodedObjectName>/uploads/<UploadId>
encodedKey := base64.URLEncoding.EncodeToString([]byte(key))
url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s",
c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId)
// 构建请求体
requestBody := map[string]interface{}{
"parts": parts,
"fname": key,
"mimeType": "",
}
bodyBytes, err := json.Marshal(requestBody)
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to marshal request", err)
}
// 创建请求
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bodyBytes))
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "UpToken "+token)
// 发送请求
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to complete multipart upload", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err)
}
if resp.StatusCode != 200 {
return nil, oss.NewError("MULTIPART_ERROR",
fmt.Sprintf("complete multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil)
}
// 解析响应
var result CompleteMultipartUploadResult
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err)
}
return &oss.UploadResult{
Key: result.Key,
ETag: result.Hash,
}, nil
}
// AbortMultipartUpload 中止分片上传任务
// 根据: https://developer.qiniu.com/kodo/api/1503/abort-multipart-upload
func (c *Client) AbortMultipartUpload(ctx context.Context, key, uploadId string) error {
// 生成上传 token分片上传 v2 使用 bucket 级别 token
token := c.generateBucketToken()
// 构建 URL
// 格式: DELETE /buckets/<BucketName>/objects/<EncodedObjectName>/uploads/<UploadId>
encodedKey := base64.URLEncoding.EncodeToString([]byte(key))
url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s",
c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId)
// 创建请求
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return oss.NewError("MULTIPART_ERROR", "failed to create request", err)
}
req.Header.Set("Authorization", "UpToken "+token)
// 发送请求
resp, err := c.httpClient.Do(req)
if err != nil {
return oss.NewError("MULTIPART_ERROR", "failed to abort multipart upload", err)
}
defer resp.Body.Close()
// 200 或 204 都表示成功
if resp.StatusCode != 200 && resp.StatusCode != 204 {
respBody, _ := io.ReadAll(resp.Body)
return oss.NewError("MULTIPART_ERROR",
fmt.Sprintf("abort multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil)
}
return nil
}
// ListParts 列举已上传的分片
// 根据: https://developer.qiniu.com/kodo/api/1504/list-parts
func (c *Client) ListParts(ctx context.Context, key, uploadId string) ([]PartInfo, error) {
// 生成上传 token分片上传 v2 使用 bucket 级别 token
token := c.generateBucketToken()
// 构建 URL
// 格式: GET /buckets/<BucketName>/objects/<EncodedObjectName>/uploads/<UploadId>?partNumberMarker=<Marker>&maxParts=<MaxParts>
encodedKey := base64.URLEncoding.EncodeToString([]byte(key))
url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s",
c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId)
// 创建请求
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err)
}
req.Header.Set("Authorization", "UpToken "+token)
// 发送请求
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to list parts", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err)
}
if resp.StatusCode != 200 {
return nil, oss.NewError("MULTIPART_ERROR",
fmt.Sprintf("list parts failed with status %d: %s", resp.StatusCode, string(respBody)), nil)
}
// 解析响应
var result struct {
Parts []struct {
PartNumber int `json:"partNumber"`
ETag string `json:"etag"`
Size int64 `json:"size"`
} `json:"parts"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err)
}
// 转换为 PartInfo
parts := make([]PartInfo, 0, len(result.Parts))
for _, p := range result.Parts {
parts = append(parts, PartInfo{
PartNumber: p.PartNumber,
ETag: p.ETag,
})
}
return parts, nil
}
// UploadMultipart 使用分片上传方式上传文件
// 自动将文件分片并上传,适用于大文件
// 注意:七牛云要求每个分片大小至少为 1MB除最后一个分片外
func (c *Client) UploadMultipart(ctx context.Context, key string, reader io.Reader, partSize int64) (*oss.UploadResult, error) {
// 默认分片大小为 4MB
if partSize <= 0 {
partSize = 4 * 1024 * 1024
}
// 七牛云要求:每个分片至少 1MB除最后一个分片外
const minPartSize = 1024 * 1024 // 1MB
if partSize < minPartSize {
partSize = minPartSize
}
// 1. 初始化上传任务
uploadId, err := c.InitiateMultipartUpload(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to initiate multipart upload: %w", err)
}
// 确保在失败时中止任务
defer func() {
if err != nil {
c.AbortMultipartUpload(context.Background(), key, uploadId)
}
}()
// 2. 读取所有数据并分片
data, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}
totalSize := int64(len(data))
// 如果文件太小,使用普通上传
if totalSize < minPartSize {
// 文件小于 1MB使用普通上传
uploadClient := NewUploadClient(c.config)
return uploadClient.Upload(ctx, key, bytes.NewReader(data))
}
partCount := int((totalSize + partSize - 1) / partSize) // 向上取整
// 3. 上传各个分片
parts := make([]PartInfo, 0, partCount)
for i := 0; i < partCount; i++ {
partNumber := i + 1
start := i * int(partSize)
end := start + int(partSize)
if end > len(data) {
end = len(data)
}
partData := data[start:end]
currentPartSize := int64(len(partData))
// 验证分片大小(除最后一个分片外,其他分片必须 >= 1MB
if i < partCount-1 && currentPartSize < minPartSize {
return nil, fmt.Errorf("part %d size (%d bytes) is less than minimum required size (%d bytes)",
partNumber, currentPartSize, minPartSize)
}
etag, err := c.UploadPart(ctx, key, uploadId, partNumber, bytes.NewReader(partData))
if err != nil {
return nil, fmt.Errorf("failed to upload part %d: %w", partNumber, err)
}
parts = append(parts, PartInfo{
PartNumber: partNumber,
ETag: etag,
})
}
// 4. 完成上传
result, err := c.CompleteMultipartUpload(ctx, key, uploadId, parts)
if err != nil {
return nil, fmt.Errorf("failed to complete multipart upload: %w", err)
}
// 成功,取消 defer 中的中止操作
err = nil
return result, nil
}
// UploadWithRetry 带重试的分片上传
// 支持失败重试,适用于不稳定的网络环境
func (c *Client) UploadWithRetry(ctx context.Context, key string, reader io.Reader, partSize int64, maxRetries int) (*oss.UploadResult, error) {
if maxRetries <= 0 {
maxRetries = 3
}
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
// 每次重试需要重新读取数据
data, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
result, err := c.UploadMultipart(ctx, key, bytes.NewReader(data), partSize)
if err == nil {
return result, nil
}
lastErr = err
// 等待一段时间后重试
time.Sleep(time.Second * time.Duration(attempt+1))
}
return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}