package qiniu import ( "bytes" "context" "crypto/md5" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "time" "u-desk/internal/oss" ) // PartInfo 分片信息 type PartInfo struct { PartNumber int `json:"partNumber"` // 分片编号 (1-10000) ETag string `json:"etag"` // 分片的 ETag } // InitiateMultipartUploadResult 初始化分片上传任务的结果 type InitiateMultipartUploadResult struct { UploadId string `json:"uploadId"` // 上传任务 ID } // UploadPartResult 上传分片的结果 type UploadPartResult struct { ETag string `json:"etag"` // 分片的 ETag MD5 string `json:"md5"` // 分片的 MD5 } // CompleteMultipartUploadResult 完成分片上传的结果 type CompleteMultipartUploadResult struct { Key string `json:"key"` // 文件 key Hash string `json:"hash"` // 文件 hash (ETag) } // InitiateMultipartUpload 初始化分片上传任务 // 根据: https://developer.qiniu.com/kodo/api/1502/initiate-multipart-upload func (c *Client) InitiateMultipartUpload(ctx context.Context, key string) (string, error) { // 生成上传 token // 注意:分片上传 v2 需要 bucket 级别的 token(不包含 key) token := c.generateBucketToken() // 构建 URL // 格式: POST /buckets//objects//uploads encodedKey := base64.URLEncoding.EncodeToString([]byte(key)) url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads", c.getUploadDomain(), c.config.Bucket, encodedKey) // 构建请求体 requestBody := map[string]string{ "fname": key, } bodyBytes, _ := json.Marshal(requestBody) // 创建请求 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bodyBytes)) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "UpToken "+token) // 发送请求 resp, err := c.httpClient.Do(req) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to initiate multipart upload", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return "", oss.NewError("MULTIPART_ERROR", fmt.Sprintf("initiate multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil) } // 解析响应 var result InitiateMultipartUploadResult if err := json.Unmarshal(respBody, &result); err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } return result.UploadId, nil } // UploadPart 上传分片 // 根据: https://developer.qiniu.com/kodo/api/6366/upload-part func (c *Client) UploadPart(ctx context.Context, key, uploadId string, partNumber int, reader io.Reader) (string, error) { // 生成上传 token(分片上传 v2 使用 bucket 级别 token) token := c.generateBucketToken() // 读取数据 data, err := io.ReadAll(reader) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to read part data", err) } // 计算 MD5 hash := md5.New() hash.Write(data) md5Sum := hash.Sum(nil) md5Base64 := base64.StdEncoding.EncodeToString(md5Sum) // 构建 URL // 格式: PUT /buckets//objects//uploads// encodedKey := base64.URLEncoding.EncodeToString([]byte(key)) url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s/%d", c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId, partNumber) // 创建请求 req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(data)) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-MD5", md5Base64) req.Header.Set("Authorization", "UpToken "+token) req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data))) // 发送请求 resp, err := c.httpClient.Do(req) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to upload part", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return "", oss.NewError("MULTIPART_ERROR", fmt.Sprintf("upload part failed with status %d: %s", resp.StatusCode, string(respBody)), nil) } // 解析响应 var result UploadPartResult if err := json.Unmarshal(respBody, &result); err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } return result.ETag, nil } // CompleteMultipartUpload 完成分片上传 // 根据: https://developer.qiniu.com/kodo/api/6368/complete-multipart-upload func (c *Client) CompleteMultipartUpload(ctx context.Context, key, uploadId string, parts []PartInfo) (*oss.UploadResult, error) { // 生成上传 token(分片上传 v2 使用 bucket 级别 token) token := c.generateBucketToken() // 构建 URL // 格式: POST /buckets//objects//uploads/ encodedKey := base64.URLEncoding.EncodeToString([]byte(key)) url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s", c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId) // 构建请求体 requestBody := map[string]interface{}{ "parts": parts, "fname": key, "mimeType": "", } bodyBytes, err := json.Marshal(requestBody) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to marshal request", err) } // 创建请求 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bodyBytes)) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "UpToken "+token) // 发送请求 resp, err := c.httpClient.Do(req) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to complete multipart upload", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return nil, oss.NewError("MULTIPART_ERROR", fmt.Sprintf("complete multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil) } // 解析响应 var result CompleteMultipartUploadResult if err := json.Unmarshal(respBody, &result); err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } return &oss.UploadResult{ Key: result.Key, ETag: result.Hash, }, nil } // AbortMultipartUpload 中止分片上传任务 // 根据: https://developer.qiniu.com/kodo/api/1503/abort-multipart-upload func (c *Client) AbortMultipartUpload(ctx context.Context, key, uploadId string) error { // 生成上传 token(分片上传 v2 使用 bucket 级别 token) token := c.generateBucketToken() // 构建 URL // 格式: DELETE /buckets//objects//uploads/ encodedKey := base64.URLEncoding.EncodeToString([]byte(key)) url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s", c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId) // 创建请求 req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil) if err != nil { return oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Authorization", "UpToken "+token) // 发送请求 resp, err := c.httpClient.Do(req) if err != nil { return oss.NewError("MULTIPART_ERROR", "failed to abort multipart upload", err) } defer resp.Body.Close() // 200 或 204 都表示成功 if resp.StatusCode != 200 && resp.StatusCode != 204 { respBody, _ := io.ReadAll(resp.Body) return oss.NewError("MULTIPART_ERROR", fmt.Sprintf("abort multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil) } return nil } // ListParts 列举已上传的分片 // 根据: https://developer.qiniu.com/kodo/api/1504/list-parts func (c *Client) ListParts(ctx context.Context, key, uploadId string) ([]PartInfo, error) { // 生成上传 token(分片上传 v2 使用 bucket 级别 token) token := c.generateBucketToken() // 构建 URL // 格式: GET /buckets//objects//uploads/?partNumberMarker=&maxParts= encodedKey := base64.URLEncoding.EncodeToString([]byte(key)) url := fmt.Sprintf("%s/buckets/%s/objects/%s/uploads/%s", c.getUploadDomain(), c.config.Bucket, encodedKey, uploadId) // 创建请求 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Authorization", "UpToken "+token) // 发送请求 resp, err := c.httpClient.Do(req) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to list parts", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return nil, oss.NewError("MULTIPART_ERROR", fmt.Sprintf("list parts failed with status %d: %s", resp.StatusCode, string(respBody)), nil) } // 解析响应 var result struct { Parts []struct { PartNumber int `json:"partNumber"` ETag string `json:"etag"` Size int64 `json:"size"` } `json:"parts"` } if err := json.Unmarshal(respBody, &result); err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } // 转换为 PartInfo parts := make([]PartInfo, 0, len(result.Parts)) for _, p := range result.Parts { parts = append(parts, PartInfo{ PartNumber: p.PartNumber, ETag: p.ETag, }) } return parts, nil } // UploadMultipart 使用分片上传方式上传文件 // 自动将文件分片并上传,适用于大文件 // 注意:七牛云要求每个分片大小至少为 1MB(除最后一个分片外) func (c *Client) UploadMultipart(ctx context.Context, key string, reader io.Reader, partSize int64) (*oss.UploadResult, error) { // 默认分片大小为 4MB if partSize <= 0 { partSize = 4 * 1024 * 1024 } // 七牛云要求:每个分片至少 1MB(除最后一个分片外) const minPartSize = 1024 * 1024 // 1MB if partSize < minPartSize { partSize = minPartSize } // 1. 初始化上传任务 uploadId, err := c.InitiateMultipartUpload(ctx, key) if err != nil { return nil, fmt.Errorf("failed to initiate multipart upload: %w", err) } // 确保在失败时中止任务 defer func() { if err != nil { c.AbortMultipartUpload(context.Background(), key, uploadId) } }() // 2. 读取所有数据并分片 data, err := io.ReadAll(reader) if err != nil { return nil, fmt.Errorf("failed to read data: %w", err) } totalSize := int64(len(data)) // 如果文件太小,使用普通上传 if totalSize < minPartSize { // 文件小于 1MB,使用普通上传 uploadClient := NewUploadClient(c.config) return uploadClient.Upload(ctx, key, bytes.NewReader(data)) } partCount := int((totalSize + partSize - 1) / partSize) // 向上取整 // 3. 上传各个分片 parts := make([]PartInfo, 0, partCount) for i := 0; i < partCount; i++ { partNumber := i + 1 start := i * int(partSize) end := start + int(partSize) if end > len(data) { end = len(data) } partData := data[start:end] currentPartSize := int64(len(partData)) // 验证分片大小(除最后一个分片外,其他分片必须 >= 1MB) if i < partCount-1 && currentPartSize < minPartSize { return nil, fmt.Errorf("part %d size (%d bytes) is less than minimum required size (%d bytes)", partNumber, currentPartSize, minPartSize) } etag, err := c.UploadPart(ctx, key, uploadId, partNumber, bytes.NewReader(partData)) if err != nil { return nil, fmt.Errorf("failed to upload part %d: %w", partNumber, err) } parts = append(parts, PartInfo{ PartNumber: partNumber, ETag: etag, }) } // 4. 完成上传 result, err := c.CompleteMultipartUpload(ctx, key, uploadId, parts) if err != nil { return nil, fmt.Errorf("failed to complete multipart upload: %w", err) } // 成功,取消 defer 中的中止操作 err = nil return result, nil } // UploadWithRetry 带重试的分片上传 // 支持失败重试,适用于不稳定的网络环境 func (c *Client) UploadWithRetry(ctx context.Context, key string, reader io.Reader, partSize int64, maxRetries int) (*oss.UploadResult, error) { if maxRetries <= 0 { maxRetries = 3 } var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { // 每次重试需要重新读取数据 data, err := io.ReadAll(reader) if err != nil { return nil, err } result, err := c.UploadMultipart(ctx, key, bytes.NewReader(data), partSize) if err == nil { return result, nil } lastErr = err // 等待一段时间后重试 time.Sleep(time.Second * time.Duration(attempt+1)) } return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr) }