diff --git a/internal/contract/user_contract.go b/internal/contract/user_contract.go index d673370..013acde 100644 --- a/internal/contract/user_contract.go +++ b/internal/contract/user_contract.go @@ -174,3 +174,10 @@ type BulkCreationSummary struct { Succeeded int `json:"succeeded"` Failed int `json:"failed"` } + +// BulkCreateAsyncResponse represents the immediate response for async bulk creation +type BulkCreateAsyncResponse struct { + JobID uuid.UUID `json:"job_id"` + Message string `json:"message"` + Status string `json:"status"` +} diff --git a/internal/handler/user_handler.go b/internal/handler/user_handler.go index 202db82..fe4db1a 100644 --- a/internal/handler/user_handler.go +++ b/internal/handler/user_handler.go @@ -66,16 +66,13 @@ func (h *UserHandler) BulkCreateUsers(c *gin.Context) { return } - // Increased limit to handle 1000+ users if len(req.Users) > 5000 { h.sendValidationErrorResponse(c, "Cannot create more than 5000 users at once", constants.MissingFieldErrorCode) return } - // Set a longer timeout for large bulk operations ctx := c.Request.Context() if len(req.Users) > 500 { - // Create a context with extended timeout for large operations var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, 10*time.Minute) defer cancel() @@ -97,11 +94,61 @@ func (h *UserHandler) BulkCreateUsers(c *gin.Context) { statusCode = http.StatusMultiStatus } - logger.FromContext(c).Infof("UserHandler::BulkCreateUsers -> Successfully processed bulk creation: %d succeeded, %d failed", + logger.FromContext(c).Infof("UserHandler::BulkCreateUsers -> Successfully processed bulk creation: %d succeeded, %d failed", response.Summary.Succeeded, response.Summary.Failed) c.JSON(statusCode, contract.BuildSuccessResponse(response)) } +func (h *UserHandler) BulkCreateUsersAsync(c *gin.Context) { + var req contract.BulkCreateUsersRequest + if err := c.ShouldBindJSON(&req); err != nil { + logger.FromContext(c).WithError(err).Error("UserHandler::BulkCreateUsersAsync -> request binding failed") + h.sendValidationErrorResponse(c, "Invalid request body", constants.MissingFieldErrorCode) + return + } + + if len(req.Users) == 0 { + h.sendValidationErrorResponse(c, "Users list cannot be empty", constants.MissingFieldErrorCode) + return + } + + if len(req.Users) > 5000 { + h.sendValidationErrorResponse(c, "Cannot create more than 5000 users at once", constants.MissingFieldErrorCode) + return + } + + logger.FromContext(c).Infof("UserHandler::BulkCreateUsersAsync -> Starting async bulk creation of %d users", len(req.Users)) + + response, err := h.userService.BulkCreateUsersAsync(c.Request.Context(), &req) + if err != nil { + logger.FromContext(c).WithError(err).Error("UserHandler::BulkCreateUsersAsync -> Failed to start async bulk creation") + h.sendErrorResponse(c, err.Error(), http.StatusInternalServerError) + return + } + + logger.FromContext(c).Infof("UserHandler::BulkCreateUsersAsync -> Job created with ID: %s", response.JobID) + c.JSON(http.StatusAccepted, contract.BuildSuccessResponse(response)) +} + +func (h *UserHandler) GetBulkJobStatus(c *gin.Context) { + jobIDStr := c.Param("jobId") + jobID, err := uuid.Parse(jobIDStr) + if err != nil { + logger.FromContext(c).WithError(err).Error("UserHandler::GetBulkJobStatus -> invalid job ID") + h.sendValidationErrorResponse(c, "Invalid job ID", constants.ValidationErrorCode) + return + } + + job, err := h.userService.GetBulkJobStatus(c.Request.Context(), jobID) + if err != nil { + logger.FromContext(c).WithError(err).Error("UserHandler::GetBulkJobStatus -> Failed to get job status") + h.sendErrorResponse(c, err.Error(), http.StatusNotFound) + return + } + + c.JSON(http.StatusOK, contract.BuildSuccessResponse(job)) +} + func (h *UserHandler) UpdateUser(c *gin.Context) { userIDStr := c.Param("id") userID, err := uuid.Parse(userIDStr) @@ -358,7 +405,7 @@ func (h *UserHandler) GetActiveUsersForMention(c *gin.Context) { if limit > 100 { limit = 100 } - + var searchPtr *string if search != "" { searchPtr = &search diff --git a/internal/handler/user_service.go b/internal/handler/user_service.go index 048bc2b..92ece85 100644 --- a/internal/handler/user_service.go +++ b/internal/handler/user_service.go @@ -3,6 +3,7 @@ package handler import ( "context" "eslogad-be/internal/contract" + "eslogad-be/internal/manager" "github.com/google/uuid" ) @@ -10,6 +11,8 @@ import ( type UserService interface { CreateUser(ctx context.Context, req *contract.CreateUserRequest) (*contract.UserResponse, error) BulkCreateUsers(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateUsersResponse, error) + BulkCreateUsersAsync(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateAsyncResponse, error) + GetBulkJobStatus(ctx context.Context, jobID uuid.UUID) (*manager.BulkJobResult, error) UpdateUser(ctx context.Context, id uuid.UUID, req *contract.UpdateUserRequest) (*contract.UserResponse, error) DeleteUser(ctx context.Context, id uuid.UUID) error GetUserByID(ctx context.Context, id uuid.UUID) (*contract.UserResponse, error) diff --git a/internal/manager/job_manager.go b/internal/manager/job_manager.go new file mode 100644 index 0000000..7fc413b --- /dev/null +++ b/internal/manager/job_manager.go @@ -0,0 +1,117 @@ +package manager + +import ( + "context" + "sync" + "time" + + "eslogad-be/internal/contract" + + "github.com/google/uuid" +) + +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusProcessing JobStatus = "processing" + JobStatusCompleted JobStatus = "completed" + JobStatusFailed JobStatus = "failed" +) + +type BulkJobResult struct { + JobID uuid.UUID `json:"job_id"` + Status JobStatus `json:"status"` + Message string `json:"message"` + StartedAt time.Time `json:"started_at"` + FinishedAt *time.Time `json:"finished_at,omitempty"` + Summary contract.BulkCreationSummary `json:"summary"` + Created []contract.UserResponse `json:"created"` + Failed []contract.BulkUserErrorResult `json:"failed"` +} + +type JobManager struct { + jobs sync.Map +} + +var jobManagerInstance *JobManager +var once sync.Once + +func GetJobManager() *JobManager { + once.Do(func() { + jobManagerInstance = &JobManager{} + }) + return jobManagerInstance +} + +func (jm *JobManager) CreateJob() uuid.UUID { + jobID := uuid.New() + job := &BulkJobResult{ + JobID: jobID, + Status: JobStatusPending, + Message: "Job created, waiting to start", + StartedAt: time.Now(), + Summary: contract.BulkCreationSummary{ + Total: 0, + Succeeded: 0, + Failed: 0, + }, + Created: []contract.UserResponse{}, + Failed: []contract.BulkUserErrorResult{}, + } + jm.jobs.Store(jobID, job) + return jobID +} + +func (jm *JobManager) UpdateJob(jobID uuid.UUID, status JobStatus, message string) { + if val, ok := jm.jobs.Load(jobID); ok { + job := val.(*BulkJobResult) + job.Status = status + job.Message = message + if status == JobStatusCompleted || status == JobStatusFailed { + now := time.Now() + job.FinishedAt = &now + } + jm.jobs.Store(jobID, job) + } +} + +func (jm *JobManager) UpdateJobResults(jobID uuid.UUID, created []contract.UserResponse, failed []contract.BulkUserErrorResult, summary contract.BulkCreationSummary) { + if val, ok := jm.jobs.Load(jobID); ok { + job := val.(*BulkJobResult) + job.Created = append(job.Created, created...) + job.Failed = append(job.Failed, failed...) + job.Summary.Total = summary.Total + job.Summary.Succeeded += summary.Succeeded + job.Summary.Failed += summary.Failed + jm.jobs.Store(jobID, job) + } +} + +func (jm *JobManager) GetJob(jobID uuid.UUID) (*BulkJobResult, bool) { + if val, ok := jm.jobs.Load(jobID); ok { + return val.(*BulkJobResult), true + } + return nil, false +} + +func (jm *JobManager) CleanupOldJobs(ctx context.Context, maxAge time.Duration) { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + cutoff := time.Now().Add(-maxAge) + jm.jobs.Range(func(key, value interface{}) bool { + job := value.(*BulkJobResult) + if job.FinishedAt != nil && job.FinishedAt.Before(cutoff) { + jm.jobs.Delete(key) + } + return true + }) + } + } +} \ No newline at end of file diff --git a/internal/processor/user_processor.go b/internal/processor/user_processor.go index 2160ed9..c20fe33 100644 --- a/internal/processor/user_processor.go +++ b/internal/processor/user_processor.go @@ -317,13 +317,11 @@ func (p *UserProcessorImpl) GetActiveUsersForMention(ctx context.Context, search func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, userRequests []contract.BulkUserRequest) ([]contract.UserResponse, []contract.BulkUserErrorResult, error) { created := []contract.UserResponse{} failed := []contract.BulkUserErrorResult{} - - // Pre-validate all users + usersToCreate := []*entities.User{} emailMap := make(map[string]bool) - + for _, req := range userRequests { - // Check for duplicate emails in the batch if emailMap[req.Email] { failed = append(failed, contract.BulkUserErrorResult{ User: req, @@ -332,8 +330,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, continue } emailMap[req.Email] = true - - // Check if email already exists in database + existing, _ := p.userRepo.GetByEmail(ctx, req.Email) if existing != nil { failed = append(failed, contract.BulkUserErrorResult{ @@ -342,8 +339,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, }) continue } - - // Hash password + hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost) if err != nil { failed = append(failed, contract.BulkUserErrorResult{ @@ -352,8 +348,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, }) continue } - - // Create user entity + user := &entities.User{ ID: uuid.New(), Name: req.Name, @@ -361,11 +356,10 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, PasswordHash: string(hashedPassword), IsActive: true, } - + usersToCreate = append(usersToCreate, user) } - - // Bulk create valid users + if len(usersToCreate) > 0 { // Use CreateInBatches for large datasets err := p.userRepo.CreateInBatches(ctx, usersToCreate, 50) @@ -385,7 +379,7 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, FullName: user.Name, } _ = p.profileRepo.Create(ctx, profile) - + created = append(created, *transformer.EntityToContract(user)) } } @@ -397,11 +391,11 @@ func (p *UserProcessorImpl) BulkCreateUsersWithTransaction(ctx context.Context, FullName: user.Name, } _ = p.profileRepo.Create(ctx, profile) - + created = append(created, *transformer.EntityToContract(user)) } } } - + return created, failed, nil } diff --git a/internal/router/health_handler.go b/internal/router/health_handler.go index 75b9ed5..b0d00cc 100644 --- a/internal/router/health_handler.go +++ b/internal/router/health_handler.go @@ -14,6 +14,8 @@ type UserHandler interface { ListTitles(c *gin.Context) GetActiveUsersForMention(c *gin.Context) BulkCreateUsers(c *gin.Context) + BulkCreateUsersAsync(c *gin.Context) + GetBulkJobStatus(c *gin.Context) } type FileHandler interface { diff --git a/internal/router/router.go b/internal/router/router.go index cd2b9cd..112241a 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -82,6 +82,8 @@ func (r *Router) addAppRoutes(rg *gin.Engine) { { users.GET("", r.userHandler.ListUsers) users.POST("/bulk", r.userHandler.BulkCreateUsers) + users.POST("/bulk/async", r.userHandler.BulkCreateUsersAsync) + users.GET("/bulk/job/:jobId", r.userHandler.GetBulkJobStatus) users.GET("/profile", r.userHandler.GetProfile) users.PUT("/profile", r.userHandler.UpdateProfile) users.PUT(":id/password", r.userHandler.ChangePassword) diff --git a/internal/service/user_service.go b/internal/service/user_service.go index 391f13d..0b71c28 100644 --- a/internal/service/user_service.go +++ b/internal/service/user_service.go @@ -2,9 +2,12 @@ package service import ( "context" + "fmt" + "sync" "eslogad-be/internal/contract" "eslogad-be/internal/entities" + "eslogad-be/internal/manager" "eslogad-be/internal/transformer" "github.com/google/uuid" @@ -41,18 +44,16 @@ func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.Bul }, } - // Process in batches to avoid memory and database issues batchSize := 50 for i := 0; i < len(req.Users); i += batchSize { end := i + batchSize if end > len(req.Users) { end = len(req.Users) } - + batch := req.Users[i:end] batchResults, err := s.processBulkUserBatch(ctx, batch) if err != nil { - // Log batch error but continue with other batches for _, userReq := range batch { response.Failed = append(response.Failed, contract.BulkUserErrorResult{ User: userReq, @@ -62,7 +63,7 @@ func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.Bul } continue } - + response.Created = append(response.Created, batchResults.Created...) response.Failed = append(response.Failed, batchResults.Failed...) response.Summary.Succeeded += batchResults.Summary.Succeeded @@ -72,6 +73,121 @@ func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.Bul return response, nil } +func (s *UserServiceImpl) BulkCreateUsersAsync(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateAsyncResponse, error) { + jobManager := manager.GetJobManager() + jobID := jobManager.CreateJob() + + // Start async processing + go s.processBulkUsersAsync(context.Background(), jobID, req) + + return &contract.BulkCreateAsyncResponse{ + JobID: jobID, + Message: fmt.Sprintf("Job started for %d users", len(req.Users)), + Status: "processing", + }, nil +} + +func (s *UserServiceImpl) processBulkUsersAsync(ctx context.Context, jobID uuid.UUID, req *contract.BulkCreateUsersRequest) { + jobManager := manager.GetJobManager() + jobManager.UpdateJob(jobID, manager.JobStatusProcessing, fmt.Sprintf("Processing %d users", len(req.Users))) + + batchSize := 50 + var wg sync.WaitGroup + resultChan := make(chan *contract.BulkCreateUsersResponse, (len(req.Users)/batchSize)+1) + + // Process each batch independently in its own goroutine + for i := 0; i < len(req.Users); i += batchSize { + end := i + batchSize + if end > len(req.Users) { + end = len(req.Users) + } + + batch := req.Users[i:end] + wg.Add(1) + + // Launch goroutine for each batch + go func(batchNum int, users []contract.BulkUserRequest) { + defer wg.Done() + + batchResult := &contract.BulkCreateUsersResponse{ + Created: []contract.UserResponse{}, + Failed: []contract.BulkUserErrorResult{}, + Summary: contract.BulkCreationSummary{ + Total: len(users), + Succeeded: 0, + Failed: 0, + }, + } + + // Process batch + created, failed, err := s.userProcessor.BulkCreateUsersWithTransaction(ctx, users) + if err != nil { + // If entire batch fails, mark all users as failed + for _, userReq := range users { + batchResult.Failed = append(batchResult.Failed, contract.BulkUserErrorResult{ + User: userReq, + Error: fmt.Sprintf("Batch %d error: %v", batchNum, err), + }) + batchResult.Summary.Failed++ + } + } else { + batchResult.Created = created + batchResult.Failed = failed + batchResult.Summary.Succeeded = len(created) + batchResult.Summary.Failed = len(failed) + } + + resultChan <- batchResult + }(i/batchSize, batch) + } + + // Wait for all batches to complete + go func() { + wg.Wait() + close(resultChan) + }() + + // Aggregate results + totalSummary := contract.BulkCreationSummary{ + Total: len(req.Users), + Succeeded: 0, + Failed: 0, + } + allCreated := []contract.UserResponse{} + allFailed := []contract.BulkUserErrorResult{} + + for result := range resultChan { + allCreated = append(allCreated, result.Created...) + allFailed = append(allFailed, result.Failed...) + totalSummary.Succeeded += result.Summary.Succeeded + totalSummary.Failed += result.Summary.Failed + + // Update job progress + jobManager.UpdateJobResults(jobID, result.Created, result.Failed, result.Summary) + } + + // Mark job as completed + status := manager.JobStatusCompleted + message := fmt.Sprintf("Completed: %d succeeded, %d failed out of %d total", + totalSummary.Succeeded, totalSummary.Failed, totalSummary.Total) + + if totalSummary.Failed == totalSummary.Total { + status = manager.JobStatusFailed + message = "All user creations failed" + } + + jobManager.UpdateJob(jobID, status, message) +} + +func (s *UserServiceImpl) GetBulkJobStatus(ctx context.Context, jobID uuid.UUID) (*manager.BulkJobResult, error) { + jobManager := manager.GetJobManager() + job, exists := jobManager.GetJob(jobID) + if !exists { + return nil, fmt.Errorf("job not found: %s", jobID) + } + return job, nil +} + func (s *UserServiceImpl) processBulkUserBatch(ctx context.Context, batch []contract.BulkUserRequest) (*contract.BulkCreateUsersResponse, error) { response := &contract.BulkCreateUsersResponse{ Created: []contract.UserResponse{},