package aliyun import ( "bytes" "context" "crypto/md5" "encoding/base64" "encoding/xml" "fmt" "io" "net/http" "strings" "time" "u-desk/internal/oss" ) // ============ 分片上传相关数据结构 ============ // PartInfo 分片信息 type PartInfo struct { PartNumber int `xml:"PartNumber"` // 分片编号 (1-10000) ETag string `xml:"ETag"` // 分片的 ETag Size int64 `xml:"Size"` // 分片大小 } // InitiateMultipartUploadResult 初始化分片上传的响应 type InitiateMultipartUploadResult struct { XMLName xml.Name `xml:"InitiateMultipartUploadResult"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` UploadID string `xml:"UploadId"` } // CompleteMultipartUploadRequest 完成分片上传的请求 type CompleteMultipartUploadRequest struct { XMLName xml.Name `xml:"CompleteMultipartUploadRequest"` Parts []PartInfo `xml:"Part"` } // CompleteMultipartUploadResult 完成分片上传的响应 type CompleteMultipartUploadResult struct { XMLName xml.Name `xml:"CompleteMultipartUploadResult"` Location string `xml:"Location"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` ETag string `xml:"ETag"` } // ListPartsResult 列举分片的响应 type ListPartsResult struct { XMLName xml.Name `xml:"ListPartsResult"` Bucket string `xml:"Bucket"` Key string `xml:"Key"` UploadID string `xml:"UploadId"` NextPartNumberMarker int `xml:"NextPartNumberMarker"` IsTruncated bool `xml:"IsTruncated"` MaxParts int `xml:"MaxParts"` PartNumberMarker int `xml:"PartNumberMarker"` StorageClass string `xml:"StorageClass"` Parts []PartInfo `xml:"Part"` } // ListMultipartUploadsResult 列举分片上传任务的响应 type ListMultipartUploadsResult struct { XMLName xml.Name `xml:"ListMultipartUploadsResult"` Bucket string `xml:"Bucket"` KeyMarker string `xml:"KeyMarker"` UploadIDMarker string `xml:"UploadIdMarker"` NextKeyMarker string `xml:"NextKeyMarker"` NextUploadIDMarker string `xml:"NextUploadIdMarker"` Delimiter string `xml:"Delimiter"` Prefix string `xml:"Prefix"` MaxUploads int `xml:"MaxUploads"` IsTruncated bool `xml:"IsTruncated"` Uploads []struct { Key string `xml:"Key"` UploadID string `xml:"UploadId"` Initiated string `xml:"Initiated"` StorageClass string `xml:"StorageClass"` } `xml:"Upload"` } // ============ 分片上传核心方法 ============ // InitiateMultipartUpload 初始化分片上传任务 // 参考: https://help.aliyun.com/zh/oss/developer-reference/initiate-multipart-upload func (c *Client) InitiateMultipartUpload(ctx context.Context, key string, contentType string) (string, error) { date := time.Now().UTC().Format(http.TimeFormat) // 构建签名字符串(包含 ?uploads 参数) path := "/" + c.config.Bucket + "/" + key + "?uploads" signature := c.generateSignature("POST", path, contentType, date, "") scheme := "https://" if !c.config.UseHTTPS { scheme = "http://" } url := scheme + c.config.Bucket + "." + c.config.Endpoint + "/" + key + "?uploads" req, err := http.NewRequestWithContext(ctx, "POST", url, nil) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Date", date) req.Header.Set("Authorization", "OSS "+c.config.AccessKeyID+":"+signature) if contentType != "" { req.Header.Set("Content-Type", contentType) } resp, err := c.httpClient.Do(req) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to initiate multipart upload", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return "", oss.NewError("MULTIPART_ERROR", fmt.Sprintf("initiate multipart upload failed with status %d: %s", resp.StatusCode, string(body)), nil) } // 解析 XML 响应 var result InitiateMultipartUploadResult if err := xml.Unmarshal(body, &result); err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } return result.UploadID, nil } // UploadPart 上传分片 // 参考: https://help.aliyun.com/zh/oss/developer-reference/upload-part func (c *Client) UploadPart(ctx context.Context, key, uploadID string, partNumber int, reader io.Reader) (string, error) { // 读取数据 data, err := io.ReadAll(reader) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to read part data", err) } // 计算 Content-MD5 hash := md5.Sum(data) contentMD5 := base64.StdEncoding.EncodeToString(hash[:]) date := time.Now().UTC().Format(http.TimeFormat) // 构建签名字符串(包含查询参数) path := fmt.Sprintf("/%s/%s?partNumber=%d&uploadId=%s", c.config.Bucket, key, partNumber, uploadID) signature := c.generateSignature("PUT", path, "application/octet-stream", date, contentMD5) scheme := "https://" if !c.config.UseHTTPS { scheme = "http://" } url := fmt.Sprintf("%s%s.%s/%s?partNumber=%d&uploadId=%s", scheme, c.config.Bucket, c.config.Endpoint, key, partNumber, uploadID) req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(data)) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Date", date) req.Header.Set("Authorization", "OSS "+c.config.AccessKeyID+":"+signature) req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-MD5", contentMD5) resp, err := c.httpClient.Do(req) if err != nil { return "", oss.NewError("MULTIPART_ERROR", "failed to upload part", err) } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return "", oss.NewError("MULTIPART_ERROR", fmt.Sprintf("upload part failed with status %d: %s", resp.StatusCode, string(body)), nil) } // 从响应头获取 ETag etag := resp.Header.Get("ETag") etag = strings.Trim(etag, "\"") return etag, nil } // CompleteMultipartUpload 完成分片上传 // 参考: https://help.aliyun.com/zh/oss/developer-reference/complete-multipart-upload func (c *Client) CompleteMultipartUpload(ctx context.Context, key, uploadID string, parts []PartInfo) (*oss.UploadResult, error) { date := time.Now().UTC().Format(http.TimeFormat) // 构建请求体 - 手动构建 XML 以确保格式正确 // 阿里云要求的 XML 格式: // // // // 1 // "etag" // // ... // var xmlBuilder strings.Builder xmlBuilder.WriteString("\n") xmlBuilder.WriteString("\n") for _, part := range parts { // ETag 需要带引号 etag := part.ETag if !strings.HasPrefix(etag, "\"") { etag = "\"" + etag } if !strings.HasSuffix(etag, "\"") { etag = etag + "\"" } xmlBuilder.WriteString(fmt.Sprintf(" \n %d\n %s\n \n", part.PartNumber, etag)) } xmlBuilder.WriteString("") bodyBytes := []byte(xmlBuilder.String()) // 构建签名字符串 contentType := "application/xml" path := fmt.Sprintf("/%s/%s?uploadId=%s", c.config.Bucket, key, uploadID) signature := c.generateSignature("POST", path, contentType, date, "") scheme := "https://" if !c.config.UseHTTPS { scheme = "http://" } url := fmt.Sprintf("%s%s.%s/%s?uploadId=%s", scheme, c.config.Bucket, c.config.Endpoint, key, uploadID) req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bodyBytes)) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Date", date) req.Header.Set("Authorization", "OSS "+c.config.AccessKeyID+":"+signature) req.Header.Set("Content-Type", contentType) resp, err := c.httpClient.Do(req) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to complete multipart upload", err) } defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return nil, oss.NewError("MULTIPART_ERROR", fmt.Sprintf("complete multipart upload failed with status %d: %s", resp.StatusCode, string(respBody)), nil) } // 解析 XML 响应 var result CompleteMultipartUploadResult if err := xml.Unmarshal(respBody, &result); err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } return &oss.UploadResult{ Key: result.Key, ETag: strings.Trim(result.ETag, "\""), }, nil } // AbortMultipartUpload 中止分片上传任务 // 参考: https://help.aliyun.com/zh/oss/developer-reference/abort-multipart-upload func (c *Client) AbortMultipartUpload(ctx context.Context, key, uploadID string) error { date := time.Now().UTC().Format(http.TimeFormat) // 构建签名字符串 path := fmt.Sprintf("/%s/%s?uploadId=%s", c.config.Bucket, key, uploadID) signature := c.generateSignature("DELETE", path, "", date, "") scheme := "https://" if !c.config.UseHTTPS { scheme = "http://" } url := fmt.Sprintf("%s%s.%s/%s?uploadId=%s", scheme, c.config.Bucket, c.config.Endpoint, key, uploadID) req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil) if err != nil { return oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Date", date) req.Header.Set("Authorization", "OSS "+c.config.AccessKeyID+":"+signature) resp, err := c.httpClient.Do(req) if err != nil { return oss.NewError("MULTIPART_ERROR", "failed to abort multipart upload", err) } defer resp.Body.Close() if resp.StatusCode != 204 { body, _ := io.ReadAll(resp.Body) return oss.NewError("MULTIPART_ERROR", fmt.Sprintf("abort multipart upload failed with status %d: %s", resp.StatusCode, string(body)), nil) } return nil } // ListParts 列举已上传的分片 // 参考: https://help.aliyun.com/zh/oss/developer-reference/list-parts func (c *Client) ListParts(ctx context.Context, key, uploadID string, maxParts int, partNumberMarker int) ([]PartInfo, error) { date := time.Now().UTC().Format(http.TimeFormat) // 构建签名字符串 path := fmt.Sprintf("/%s/%s?uploadId=%s", c.config.Bucket, key, uploadID) signature := c.generateSignature("GET", path, "", date, "") scheme := "https://" if !c.config.UseHTTPS { scheme = "http://" } // 构建查询参数 params := []string{ fmt.Sprintf("uploadId=%s", uploadID), } if maxParts > 0 { params = append(params, fmt.Sprintf("max-parts=%d", maxParts)) } if partNumberMarker > 0 { params = append(params, fmt.Sprintf("part-number-marker=%d", partNumberMarker)) } url := fmt.Sprintf("%s%s.%s/%s?%s", scheme, c.config.Bucket, c.config.Endpoint, key, strings.Join(params, "&")) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Date", date) req.Header.Set("Authorization", "OSS "+c.config.AccessKeyID+":"+signature) resp, err := c.httpClient.Do(req) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to list parts", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return nil, oss.NewError("MULTIPART_ERROR", fmt.Sprintf("list parts failed with status %d: %s", resp.StatusCode, string(body)), nil) } // 解析 XML 响应 var result ListPartsResult if err := xml.Unmarshal(body, &result); err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } return result.Parts, nil } // ListMultipartUploads 列举所有进行中的分片上传任务 // 参考: 阿里云 OSS API 文档 func (c *Client) ListMultipartUploads(ctx context.Context, prefix string, maxUploads int) ([]struct { Key string UploadID string Initiated string StorageClass string }, error) { date := time.Now().UTC().Format(http.TimeFormat) // 构建签名字符串 path := "/" + c.config.Bucket + "?uploads" signature := c.generateSignature("GET", path, "", date, "") scheme := "https://" if !c.config.UseHTTPS { scheme = "http://" } // 构建查询参数 params := []string{"uploads"} if prefix != "" { params = append(params, "prefix="+prefix) } if maxUploads > 0 { params = append(params, fmt.Sprintf("max-uploads=%d", maxUploads)) } url := fmt.Sprintf("%s%s.%s/?%s", scheme, c.config.Bucket, c.config.Endpoint, strings.Join(params, "&")) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to create request", err) } req.Header.Set("Date", date) req.Header.Set("Authorization", "OSS "+c.config.AccessKeyID+":"+signature) resp, err := c.httpClient.Do(req) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to list multipart uploads", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to read response", err) } if resp.StatusCode != 200 { return nil, oss.NewError("MULTIPART_ERROR", fmt.Sprintf("list multipart uploads failed with status %d: %s", resp.StatusCode, string(body)), nil) } // 解析 XML 响应 var result ListMultipartUploadsResult if err := xml.Unmarshal(body, &result); err != nil { return nil, oss.NewError("MULTIPART_ERROR", "failed to parse response", err) } // 转换返回结果 uploads := make([]struct { Key string UploadID string Initiated string StorageClass string }, 0, len(result.Uploads)) for _, u := range result.Uploads { uploads = append(uploads, struct { Key string UploadID string Initiated string StorageClass string }{ Key: u.Key, UploadID: u.UploadID, Initiated: u.Initiated, StorageClass: u.StorageClass, }) } return uploads, nil } // ============ 高级辅助方法 ============ // UploadMultipart 使用分片上传方式上传文件 // 自动将文件分片并上传,适用于大文件 // 阿里云 OSS 要求:每个分片大小 100KB ~ 5GB,除最后一个分片外 func (c *Client) UploadMultipart(ctx context.Context, key string, reader io.Reader, partSize int64, contentType string) (*oss.UploadResult, error) { // 默认分片大小为 10MB if partSize <= 0 { partSize = 10 * 1024 * 1024 } // 阿里云 OSS 要求:每个分片大小至少 100KB const minPartSize = 100 * 1024 // 100KB if partSize < minPartSize { partSize = minPartSize } // 1. 初始化上传任务 uploadID, err := c.InitiateMultipartUpload(ctx, key, contentType) if err != nil { return nil, fmt.Errorf("failed to initiate multipart upload: %w", err) } // 确保在失败时中止任务 defer func() { if err != nil { c.AbortMultipartUpload(context.Background(), key, uploadID) } }() // 2. 读取所有数据并分片 data, err := io.ReadAll(reader) if err != nil { return nil, fmt.Errorf("failed to read data: %w", err) } totalSize := int64(len(data)) // 如果文件太小,使用普通上传 if totalSize < minPartSize { options := &oss.UploadOptions{ContentType: contentType} return c.Upload(ctx, key, bytes.NewReader(data), options) } partCount := int((totalSize + partSize - 1) / partSize) // 向上取整 // 阿里云限制:最多 10000 个分片 if partCount > 10000 { // 重新计算分片大小 partSize = (totalSize + 9999) / 10000 if partSize < minPartSize { partSize = minPartSize } partCount = int((totalSize + partSize - 1) / partSize) } // 3. 上传各个分片 parts := make([]PartInfo, 0, partCount) for i := 0; i < partCount; i++ { partNumber := i + 1 start := i * int(partSize) end := start + int(partSize) if end > len(data) { end = len(data) } partData := data[start:end] currentPartSize := int64(len(partData)) // 验证分片大小(除最后一个分片外,其他分片必须 >= 100KB) if i < partCount-1 && currentPartSize < minPartSize { return nil, fmt.Errorf("part %d size (%d bytes) is less than minimum required size (%d bytes)", partNumber, currentPartSize, minPartSize) } etag, err := c.UploadPart(ctx, key, uploadID, partNumber, bytes.NewReader(partData)) if err != nil { return nil, fmt.Errorf("failed to upload part %d: %w", partNumber, err) } parts = append(parts, PartInfo{ PartNumber: partNumber, ETag: etag, Size: currentPartSize, }) } // 4. 完成上传 result, err := c.CompleteMultipartUpload(ctx, key, uploadID, parts) if err != nil { return nil, fmt.Errorf("failed to complete multipart upload: %w", err) } // 成功,取消 defer 中的中止操作 err = nil return result, nil } // UploadWithRetry 带重试的分片上传 // 支持失败重试,适用于不稳定的网络环境 func (c *Client) UploadWithRetry(ctx context.Context, key string, reader io.Reader, partSize int64, maxRetries int, contentType string) (*oss.UploadResult, error) { if maxRetries <= 0 { maxRetries = 3 } var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { // 每次重试需要重新读取数据 data, err := io.ReadAll(reader) if err != nil { return nil, err } result, err := c.UploadMultipart(ctx, key, bytes.NewReader(data), partSize, contentType) if err == nil { return result, nil } lastErr = err // 等待一段时间后重试 time.Sleep(time.Second * time.Duration(attempt+1)) } return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr) }