138 lines
3.6 KiB
Go
138 lines
3.6 KiB
Go
package middleware
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const (
|
|
IdempotencyKeyHeader = "X-Idempotency-Key"
|
|
idempotencyTTL = 24 * time.Hour
|
|
idempotencyPrefix = "idempotency:"
|
|
)
|
|
|
|
type cachedResponse struct {
|
|
StatusCode int `json:"status_code"`
|
|
Headers map[string]string `json:"headers"`
|
|
Body string `json:"body"`
|
|
}
|
|
|
|
// IdempotencyMiddleware returns a Gin middleware that ensures idempotent processing
|
|
// for mutating operations. Client must send X-Idempotency-Key header.
|
|
func IdempotencyMiddleware(redisClient *redis.Client) gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
key := c.GetHeader(IdempotencyKeyHeader)
|
|
if key == "" {
|
|
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
|
|
"success": false,
|
|
"errors": []gin.H{
|
|
{
|
|
"code": "missing_idempotency_key",
|
|
"entity": "IdempotencyMiddleware",
|
|
"cause": "X-Idempotency-Key header is required",
|
|
},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
redisKey := fmt.Sprintf("%s%s", idempotencyPrefix, key)
|
|
ctx := context.Background()
|
|
|
|
fmt.Printf("[DEBUG] IdempotencyMiddleware: key=%s redisKey=%s\n", key, redisKey)
|
|
|
|
// Check if key already exists (request was already processed)
|
|
cached, err := redisClient.Get(ctx, redisKey).Result()
|
|
if err == nil {
|
|
// Key exists — return cached response
|
|
fmt.Printf("[DEBUG] IdempotencyMiddleware: cache HIT for key=%s\n", key)
|
|
var resp cachedResponse
|
|
if err := json.Unmarshal([]byte(cached), &resp); err == nil {
|
|
for k, v := range resp.Headers {
|
|
c.Writer.Header().Set(k, v)
|
|
}
|
|
c.Writer.Header().Set("X-Idempotent-Replay", "true")
|
|
c.Data(resp.StatusCode, "application/json", []byte(resp.Body))
|
|
c.Abort()
|
|
return
|
|
}
|
|
} else {
|
|
fmt.Printf("[DEBUG] IdempotencyMiddleware: cache MISS for key=%s err=%v\n", key, err)
|
|
}
|
|
|
|
// Mark key as in-progress to prevent concurrent duplicates
|
|
set, err := redisClient.SetNX(ctx, redisKey, "processing", idempotencyTTL).Result()
|
|
if err != nil {
|
|
// Redis error — proceed without idempotency (fail open)
|
|
c.Next()
|
|
return
|
|
}
|
|
if !set {
|
|
// Another request with the same key is being processed
|
|
c.AbortWithStatusJSON(http.StatusConflict, gin.H{
|
|
"success": false,
|
|
"errors": []gin.H{
|
|
{
|
|
"code": "request_in_progress",
|
|
"entity": "IdempotencyMiddleware",
|
|
"cause": "A request with this idempotency key is already being processed",
|
|
},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
// Capture response using a custom writer
|
|
writer := &responseCapture{
|
|
ResponseWriter: c.Writer,
|
|
body: &bytes.Buffer{},
|
|
}
|
|
c.Writer = writer
|
|
|
|
c.Next()
|
|
|
|
// After handler completes, cache the response only if successful (2xx)
|
|
statusCode := writer.Status()
|
|
if statusCode >= 200 && statusCode < 300 {
|
|
resp := cachedResponse{
|
|
StatusCode: statusCode,
|
|
Headers: map[string]string{
|
|
"Content-Type": writer.Header().Get("Content-Type"),
|
|
},
|
|
Body: writer.body.String(),
|
|
}
|
|
|
|
respJSON, err := json.Marshal(resp)
|
|
if err == nil {
|
|
redisClient.Set(ctx, redisKey, string(respJSON), idempotencyTTL)
|
|
}
|
|
} else {
|
|
// Remove the in-progress key so the client can retry with the same key
|
|
redisClient.Del(ctx, redisKey)
|
|
}
|
|
}
|
|
}
|
|
|
|
// responseCapture wraps gin.ResponseWriter to capture the response body
|
|
type responseCapture struct {
|
|
gin.ResponseWriter
|
|
body *bytes.Buffer
|
|
}
|
|
|
|
func (w *responseCapture) Write(b []byte) (int, error) {
|
|
w.body.Write(b)
|
|
return w.ResponseWriter.Write(b)
|
|
}
|
|
|
|
func (w *responseCapture) WriteString(s string) (int, error) {
|
|
w.body.WriteString(s)
|
|
return w.ResponseWriter.WriteString(s)
|
|
}
|