add bulk users

This commit is contained in:
Aditya Siregar 2025-08-15 22:42:58 +07:00
parent e1a5e9efd3
commit 5966301165
8 changed files with 313 additions and 25 deletions

View File

@ -174,3 +174,10 @@ type BulkCreationSummary struct {
Succeeded int `json:"succeeded"`
Failed int `json:"failed"`
}
// BulkCreateAsyncResponse represents the immediate response for async bulk creation
type BulkCreateAsyncResponse struct {
JobID uuid.UUID `json:"job_id"`
Message string `json:"message"`
Status string `json:"status"`
}

View File

@ -66,16 +66,13 @@ func (h *UserHandler) BulkCreateUsers(c *gin.Context) {
return
}
// Increased limit to handle 1000+ users
if len(req.Users) > 5000 {
h.sendValidationErrorResponse(c, "Cannot create more than 5000 users at once", constants.MissingFieldErrorCode)
return
}
// Set a longer timeout for large bulk operations
ctx := c.Request.Context()
if len(req.Users) > 500 {
// Create a context with extended timeout for large operations
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
@ -97,11 +94,61 @@ func (h *UserHandler) BulkCreateUsers(c *gin.Context) {
statusCode = http.StatusMultiStatus
}
logger.FromContext(c).Infof("UserHandler::BulkCreateUsers -> Successfully processed bulk creation: %d succeeded, %d failed",
logger.FromContext(c).Infof("UserHandler::BulkCreateUsers -> Successfully processed bulk creation: %d succeeded, %d failed",
response.Summary.Succeeded, response.Summary.Failed)
c.JSON(statusCode, contract.BuildSuccessResponse(response))
}
func (h *UserHandler) BulkCreateUsersAsync(c *gin.Context) {
var req contract.BulkCreateUsersRequest
if err := c.ShouldBindJSON(&req); err != nil {
logger.FromContext(c).WithError(err).Error("UserHandler::BulkCreateUsersAsync -> request binding failed")
h.sendValidationErrorResponse(c, "Invalid request body", constants.MissingFieldErrorCode)
return
}
if len(req.Users) == 0 {
h.sendValidationErrorResponse(c, "Users list cannot be empty", constants.MissingFieldErrorCode)
return
}
if len(req.Users) > 5000 {
h.sendValidationErrorResponse(c, "Cannot create more than 5000 users at once", constants.MissingFieldErrorCode)
return
}
logger.FromContext(c).Infof("UserHandler::BulkCreateUsersAsync -> Starting async bulk creation of %d users", len(req.Users))
response, err := h.userService.BulkCreateUsersAsync(c.Request.Context(), &req)
if err != nil {
logger.FromContext(c).WithError(err).Error("UserHandler::BulkCreateUsersAsync -> Failed to start async bulk creation")
h.sendErrorResponse(c, err.Error(), http.StatusInternalServerError)
return
}
logger.FromContext(c).Infof("UserHandler::BulkCreateUsersAsync -> Job created with ID: %s", response.JobID)
c.JSON(http.StatusAccepted, contract.BuildSuccessResponse(response))
}
func (h *UserHandler) GetBulkJobStatus(c *gin.Context) {
jobIDStr := c.Param("jobId")
jobID, err := uuid.Parse(jobIDStr)
if err != nil {
logger.FromContext(c).WithError(err).Error("UserHandler::GetBulkJobStatus -> invalid job ID")
h.sendValidationErrorResponse(c, "Invalid job ID", constants.ValidationErrorCode)
return
}
job, err := h.userService.GetBulkJobStatus(c.Request.Context(), jobID)
if err != nil {
logger.FromContext(c).WithError(err).Error("UserHandler::GetBulkJobStatus -> Failed to get job status")
h.sendErrorResponse(c, err.Error(), http.StatusNotFound)
return
}
c.JSON(http.StatusOK, contract.BuildSuccessResponse(job))
}
func (h *UserHandler) UpdateUser(c *gin.Context) {
userIDStr := c.Param("id")
userID, err := uuid.Parse(userIDStr)
@ -358,7 +405,7 @@ func (h *UserHandler) GetActiveUsersForMention(c *gin.Context) {
if limit > 100 {
limit = 100
}
var searchPtr *string
if search != "" {
searchPtr = &search

View File

@ -3,6 +3,7 @@ package handler
import (
"context"
"eslogad-be/internal/contract"
"eslogad-be/internal/manager"
"github.com/google/uuid"
)
@ -10,6 +11,8 @@ import (
type UserService interface {
CreateUser(ctx context.Context, req *contract.CreateUserRequest) (*contract.UserResponse, error)
BulkCreateUsers(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateUsersResponse, error)
BulkCreateUsersAsync(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateAsyncResponse, error)
GetBulkJobStatus(ctx context.Context, jobID uuid.UUID) (*manager.BulkJobResult, error)
UpdateUser(ctx context.Context, id uuid.UUID, req *contract.UpdateUserRequest) (*contract.UserResponse, error)
DeleteUser(ctx context.Context, id uuid.UUID) error
GetUserByID(ctx context.Context, id uuid.UUID) (*contract.UserResponse, error)

View File

@ -0,0 +1,117 @@
package manager
import (
"context"
"sync"
"time"
"eslogad-be/internal/contract"
"github.com/google/uuid"
)
type JobStatus string
const (
JobStatusPending JobStatus = "pending"
JobStatusProcessing JobStatus = "processing"
JobStatusCompleted JobStatus = "completed"
JobStatusFailed JobStatus = "failed"
)
type BulkJobResult struct {
JobID uuid.UUID `json:"job_id"`
Status JobStatus `json:"status"`
Message string `json:"message"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
Summary contract.BulkCreationSummary `json:"summary"`
Created []contract.UserResponse `json:"created"`
Failed []contract.BulkUserErrorResult `json:"failed"`
}
type JobManager struct {
jobs sync.Map
}
var jobManagerInstance *JobManager
var once sync.Once
func GetJobManager() *JobManager {
once.Do(func() {
jobManagerInstance = &JobManager{}
})
return jobManagerInstance
}
func (jm *JobManager) CreateJob() uuid.UUID {
jobID := uuid.New()
job := &BulkJobResult{
JobID: jobID,
Status: JobStatusPending,
Message: "Job created, waiting to start",
StartedAt: time.Now(),
Summary: contract.BulkCreationSummary{
Total: 0,
Succeeded: 0,
Failed: 0,
},
Created: []contract.UserResponse{},
Failed: []contract.BulkUserErrorResult{},
}
jm.jobs.Store(jobID, job)
return jobID
}
func (jm *JobManager) UpdateJob(jobID uuid.UUID, status JobStatus, message string) {
if val, ok := jm.jobs.Load(jobID); ok {
job := val.(*BulkJobResult)
job.Status = status
job.Message = message
if status == JobStatusCompleted || status == JobStatusFailed {
now := time.Now()
job.FinishedAt = &now
}
jm.jobs.Store(jobID, job)
}
}
func (jm *JobManager) UpdateJobResults(jobID uuid.UUID, created []contract.UserResponse, failed []contract.BulkUserErrorResult, summary contract.BulkCreationSummary) {
if val, ok := jm.jobs.Load(jobID); ok {
job := val.(*BulkJobResult)
job.Created = append(job.Created, created...)
job.Failed = append(job.Failed, failed...)
job.Summary.Total = summary.Total
job.Summary.Succeeded += summary.Succeeded
job.Summary.Failed += summary.Failed
jm.jobs.Store(jobID, job)
}
}
func (jm *JobManager) GetJob(jobID uuid.UUID) (*BulkJobResult, bool) {
if val, ok := jm.jobs.Load(jobID); ok {
return val.(*BulkJobResult), true
}
return nil, false
}
func (jm *JobManager) CleanupOldJobs(ctx context.Context, maxAge time.Duration) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cutoff := time.Now().Add(-maxAge)
jm.jobs.Range(func(key, value interface{}) bool {
job := value.(*BulkJobResult)
if job.FinishedAt != nil && job.FinishedAt.Before(cutoff) {
jm.jobs.Delete(key)
}
return true
})
}
}
}

View File

@ -317,13 +317,11 @@ func (p *UserProcessorImpl) GetActiveUsersForMention(ctx context.Context, search
func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, userRequests []contract.BulkUserRequest) ([]contract.UserResponse, []contract.BulkUserErrorResult, error) {
created := []contract.UserResponse{}
failed := []contract.BulkUserErrorResult{}
// Pre-validate all users
usersToCreate := []*entities.User{}
emailMap := make(map[string]bool)
for _, req := range userRequests {
// Check for duplicate emails in the batch
if emailMap[req.Email] {
failed = append(failed, contract.BulkUserErrorResult{
User: req,
@ -332,8 +330,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context,
continue
}
emailMap[req.Email] = true
// Check if email already exists in database
existing, _ := p.userRepo.GetByEmail(ctx, req.Email)
if existing != nil {
failed = append(failed, contract.BulkUserErrorResult{
@ -342,8 +339,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context,
})
continue
}
// Hash password
hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
if err != nil {
failed = append(failed, contract.BulkUserErrorResult{
@ -352,8 +348,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context,
})
continue
}
// Create user entity
user := &entities.User{
ID: uuid.New(),
Name: req.Name,
@ -361,11 +356,10 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context,
PasswordHash: string(hashedPassword),
IsActive: true,
}
usersToCreate = append(usersToCreate, user)
}
// Bulk create valid users
if len(usersToCreate) > 0 {
// Use CreateInBatches for large datasets
err := p.userRepo.CreateInBatches(ctx, usersToCreate, 50)
@ -385,7 +379,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context,
FullName: user.Name,
}
_ = p.profileRepo.Create(ctx, profile)
created = append(created, *transformer.EntityToContract(user))
}
}
@ -397,11 +391,11 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context,
FullName: user.Name,
}
_ = p.profileRepo.Create(ctx, profile)
created = append(created, *transformer.EntityToContract(user))
}
}
}
return created, failed, nil
}

View File

@ -14,6 +14,8 @@ type UserHandler interface {
ListTitles(c *gin.Context)
GetActiveUsersForMention(c *gin.Context)
BulkCreateUsers(c *gin.Context)
BulkCreateUsersAsync(c *gin.Context)
GetBulkJobStatus(c *gin.Context)
}
type FileHandler interface {

View File

@ -82,6 +82,8 @@ func (r *Router) addAppRoutes(rg *gin.Engine) {
{
users.GET("", r.userHandler.ListUsers)
users.POST("/bulk", r.userHandler.BulkCreateUsers)
users.POST("/bulk/async", r.userHandler.BulkCreateUsersAsync)
users.GET("/bulk/job/:jobId", r.userHandler.GetBulkJobStatus)
users.GET("/profile", r.userHandler.GetProfile)
users.PUT("/profile", r.userHandler.UpdateProfile)
users.PUT(":id/password", r.userHandler.ChangePassword)

View File

@ -2,9 +2,12 @@ package service
import (
"context"
"fmt"
"sync"
"eslogad-be/internal/contract"
"eslogad-be/internal/entities"
"eslogad-be/internal/manager"
"eslogad-be/internal/transformer"
"github.com/google/uuid"
@ -41,18 +44,16 @@ func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.Bul
},
}
// Process in batches to avoid memory and database issues
batchSize := 50
for i := 0; i < len(req.Users); i += batchSize {
end := i + batchSize
if end > len(req.Users) {
end = len(req.Users)
}
batch := req.Users[i:end]
batchResults, err := s.processBulkUserBatch(ctx, batch)
if err != nil {
// Log batch error but continue with other batches
for _, userReq := range batch {
response.Failed = append(response.Failed, contract.BulkUserErrorResult{
User: userReq,
@ -62,7 +63,7 @@ func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.Bul
}
continue
}
response.Created = append(response.Created, batchResults.Created...)
response.Failed = append(response.Failed, batchResults.Failed...)
response.Summary.Succeeded += batchResults.Summary.Succeeded
@ -72,6 +73,121 @@ func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.Bul
return response, nil
}
func (s *UserServiceImpl) BulkCreateUsersAsync(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateAsyncResponse, error) {
jobManager := manager.GetJobManager()
jobID := jobManager.CreateJob()
// Start async processing
go s.processBulkUsersAsync(context.Background(), jobID, req)
return &contract.BulkCreateAsyncResponse{
JobID: jobID,
Message: fmt.Sprintf("Job started for %d users", len(req.Users)),
Status: "processing",
}, nil
}
func (s *UserServiceImpl) processBulkUsersAsync(ctx context.Context, jobID uuid.UUID, req *contract.BulkCreateUsersRequest) {
jobManager := manager.GetJobManager()
jobManager.UpdateJob(jobID, manager.JobStatusProcessing, fmt.Sprintf("Processing %d users", len(req.Users)))
batchSize := 50
var wg sync.WaitGroup
resultChan := make(chan *contract.BulkCreateUsersResponse, (len(req.Users)/batchSize)+1)
// Process each batch independently in its own goroutine
for i := 0; i < len(req.Users); i += batchSize {
end := i + batchSize
if end > len(req.Users) {
end = len(req.Users)
}
batch := req.Users[i:end]
wg.Add(1)
// Launch goroutine for each batch
go func(batchNum int, users []contract.BulkUserRequest) {
defer wg.Done()
batchResult := &contract.BulkCreateUsersResponse{
Created: []contract.UserResponse{},
Failed: []contract.BulkUserErrorResult{},
Summary: contract.BulkCreationSummary{
Total: len(users),
Succeeded: 0,
Failed: 0,
},
}
// Process batch
created, failed, err := s.userProcessor.BulkCreateUsersWithTransaction(ctx, users)
if err != nil {
// If entire batch fails, mark all users as failed
for _, userReq := range users {
batchResult.Failed = append(batchResult.Failed, contract.BulkUserErrorResult{
User: userReq,
Error: fmt.Sprintf("Batch %d error: %v", batchNum, err),
})
batchResult.Summary.Failed++
}
} else {
batchResult.Created = created
batchResult.Failed = failed
batchResult.Summary.Succeeded = len(created)
batchResult.Summary.Failed = len(failed)
}
resultChan <- batchResult
}(i/batchSize, batch)
}
// Wait for all batches to complete
go func() {
wg.Wait()
close(resultChan)
}()
// Aggregate results
totalSummary := contract.BulkCreationSummary{
Total: len(req.Users),
Succeeded: 0,
Failed: 0,
}
allCreated := []contract.UserResponse{}
allFailed := []contract.BulkUserErrorResult{}
for result := range resultChan {
allCreated = append(allCreated, result.Created...)
allFailed = append(allFailed, result.Failed...)
totalSummary.Succeeded += result.Summary.Succeeded
totalSummary.Failed += result.Summary.Failed
// Update job progress
jobManager.UpdateJobResults(jobID, result.Created, result.Failed, result.Summary)
}
// Mark job as completed
status := manager.JobStatusCompleted
message := fmt.Sprintf("Completed: %d succeeded, %d failed out of %d total",
totalSummary.Succeeded, totalSummary.Failed, totalSummary.Total)
if totalSummary.Failed == totalSummary.Total {
status = manager.JobStatusFailed
message = "All user creations failed"
}
jobManager.UpdateJob(jobID, status, message)
}
func (s *UserServiceImpl) GetBulkJobStatus(ctx context.Context, jobID uuid.UUID) (*manager.BulkJobResult, error) {
jobManager := manager.GetJobManager()
job, exists := jobManager.GetJob(jobID)
if !exists {
return nil, fmt.Errorf("job not found: %s", jobID)
}
return job, nil
}
func (s *UserServiceImpl) processBulkUserBatch(ctx context.Context, batch []contract.BulkUserRequest) (*contract.BulkCreateUsersResponse, error) {
response := &contract.BulkCreateUsersResponse{
Created: []contract.UserResponse{},