package service import ( "context" "fmt" "sync" "eslogad-be/internal/contract" "eslogad-be/internal/entities" "eslogad-be/internal/manager" "eslogad-be/internal/transformer" "github.com/google/uuid" ) type UserServiceImpl struct { userProcessor UserProcessor titleRepo TitleRepository } type TitleRepository interface { ListAll(ctx context.Context) ([]entities.Title, error) } func NewUserService(userProcessor UserProcessor, titleRepo TitleRepository) *UserServiceImpl { return &UserServiceImpl{ userProcessor: userProcessor, titleRepo: titleRepo, } } func (s *UserServiceImpl) CreateUser(ctx context.Context, req *contract.CreateUserRequest) (*contract.UserResponse, error) { return s.userProcessor.CreateUser(ctx, req) } func (s *UserServiceImpl) BulkCreateUsers(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateUsersResponse, error) { response := &contract.BulkCreateUsersResponse{ Created: []contract.UserResponse{}, Failed: []contract.BulkUserErrorResult{}, Summary: contract.BulkCreationSummary{ Total: len(req.Users), Succeeded: 0, Failed: 0, }, } batchSize := 50 for i := 0; i < len(req.Users); i += batchSize { end := i + batchSize if end > len(req.Users) { end = len(req.Users) } batch := req.Users[i:end] batchResults, err := s.processBulkUserBatch(ctx, batch) if err != nil { for _, userReq := range batch { response.Failed = append(response.Failed, contract.BulkUserErrorResult{ User: userReq, Error: "Batch processing error: " + err.Error(), }) response.Summary.Failed++ } continue } response.Created = append(response.Created, batchResults.Created...) response.Failed = append(response.Failed, batchResults.Failed...) response.Summary.Succeeded += batchResults.Summary.Succeeded response.Summary.Failed += batchResults.Summary.Failed } return response, nil } func (s *UserServiceImpl) BulkCreateUsersAsync(ctx context.Context, req *contract.BulkCreateUsersRequest) (*contract.BulkCreateAsyncResponse, error) { jobManager := manager.GetJobManager() jobID := jobManager.CreateJob() // Start async processing go s.processBulkUsersAsync(context.Background(), jobID, req) return &contract.BulkCreateAsyncResponse{ JobID: jobID, Message: fmt.Sprintf("Job started for %d users", len(req.Users)), Status: "processing", }, nil } func (s *UserServiceImpl) processBulkUsersAsync(ctx context.Context, jobID uuid.UUID, req *contract.BulkCreateUsersRequest) { jobManager := manager.GetJobManager() jobManager.UpdateJob(jobID, manager.JobStatusProcessing, fmt.Sprintf("Processing %d users", len(req.Users))) batchSize := 50 var wg sync.WaitGroup resultChan := make(chan *contract.BulkCreateUsersResponse, (len(req.Users)/batchSize)+1) // Process each batch independently in its own goroutine for i := 0; i < len(req.Users); i += batchSize { end := i + batchSize if end > len(req.Users) { end = len(req.Users) } batch := req.Users[i:end] wg.Add(1) // Launch goroutine for each batch go func(batchNum int, users []contract.BulkUserRequest) { defer wg.Done() batchResult := &contract.BulkCreateUsersResponse{ Created: []contract.UserResponse{}, Failed: []contract.BulkUserErrorResult{}, Summary: contract.BulkCreationSummary{ Total: len(users), Succeeded: 0, Failed: 0, }, } // Process batch created, failed, err := s.userProcessor.BulkCreateUsersWithTransaction(ctx, users) if err != nil { // If entire batch fails, mark all users as failed for _, userReq := range users { batchResult.Failed = append(batchResult.Failed, contract.BulkUserErrorResult{ User: userReq, Error: fmt.Sprintf("Batch %d error: %v", batchNum, err), }) batchResult.Summary.Failed++ } } else { batchResult.Created = created batchResult.Failed = failed batchResult.Summary.Succeeded = len(created) batchResult.Summary.Failed = len(failed) } resultChan <- batchResult }(i/batchSize, batch) } // Wait for all batches to complete go func() { wg.Wait() close(resultChan) }() // Aggregate results totalSummary := contract.BulkCreationSummary{ Total: len(req.Users), Succeeded: 0, Failed: 0, } allCreated := []contract.UserResponse{} allFailed := []contract.BulkUserErrorResult{} for result := range resultChan { allCreated = append(allCreated, result.Created...) allFailed = append(allFailed, result.Failed...) totalSummary.Succeeded += result.Summary.Succeeded totalSummary.Failed += result.Summary.Failed // Update job progress jobManager.UpdateJobResults(jobID, result.Created, result.Failed, result.Summary) } // Mark job as completed status := manager.JobStatusCompleted message := fmt.Sprintf("Completed: %d succeeded, %d failed out of %d total", totalSummary.Succeeded, totalSummary.Failed, totalSummary.Total) if totalSummary.Failed == totalSummary.Total { status = manager.JobStatusFailed message = "All user creations failed" } jobManager.UpdateJob(jobID, status, message) } func (s *UserServiceImpl) GetBulkJobStatus(ctx context.Context, jobID uuid.UUID) (*manager.BulkJobResult, error) { jobManager := manager.GetJobManager() job, exists := jobManager.GetJob(jobID) if !exists { return nil, fmt.Errorf("job not found: %s", jobID) } return job, nil } func (s *UserServiceImpl) processBulkUserBatch(ctx context.Context, batch []contract.BulkUserRequest) (*contract.BulkCreateUsersResponse, error) { response := &contract.BulkCreateUsersResponse{ Created: []contract.UserResponse{}, Failed: []contract.BulkUserErrorResult{}, Summary: contract.BulkCreationSummary{ Total: len(batch), Succeeded: 0, Failed: 0, }, } // Use transaction for batch processing created, failed, err := s.userProcessor.BulkCreateUsersWithTransaction(ctx, batch) if err != nil { return response, err } response.Created = created response.Failed = failed response.Summary.Succeeded = len(created) response.Summary.Failed = len(failed) return response, nil } func (s *UserServiceImpl) UpdateUser(ctx context.Context, id uuid.UUID, req *contract.UpdateUserRequest) (*contract.UserResponse, error) { return s.userProcessor.UpdateUser(ctx, id, req) } func (s *UserServiceImpl) DeleteUser(ctx context.Context, id uuid.UUID) error { return s.userProcessor.DeleteUser(ctx, id) } func (s *UserServiceImpl) GetUserByID(ctx context.Context, id uuid.UUID) (*contract.UserResponse, error) { return s.userProcessor.GetUserByID(ctx, id) } func (s *UserServiceImpl) GetUserByEmail(ctx context.Context, email string) (*contract.UserResponse, error) { return s.userProcessor.GetUserByEmail(ctx, email) } func (s *UserServiceImpl) ListUsers(ctx context.Context, req *contract.ListUsersRequest) (*contract.ListUsersResponse, error) { page := req.Page if page <= 0 { page = 1 } limit := req.Limit if limit <= 0 { limit = 10 } userResponses, totalCount, err := s.userProcessor.ListUsersWithFilters(ctx, req) if err != nil { return nil, err } return &contract.ListUsersResponse{ Users: userResponses, Pagination: transformer.CreatePaginationResponse(totalCount, page, limit), }, nil } func (s *UserServiceImpl) ChangePassword(ctx context.Context, userID uuid.UUID, req *contract.ChangePasswordRequest) error { return s.userProcessor.ChangePassword(ctx, userID, req) } func (s *UserServiceImpl) GetProfile(ctx context.Context, userID uuid.UUID) (*contract.UserProfileResponse, error) { prof, err := s.userProcessor.GetUserProfile(ctx, userID) if err != nil { return nil, err } if roles, err := s.userProcessor.GetUserRoles(ctx, userID); err == nil { prof.Roles = roles } return prof, nil } func (s *UserServiceImpl) UpdateProfile(ctx context.Context, userID uuid.UUID, req *contract.UpdateUserProfileRequest) (*contract.UserProfileResponse, error) { return s.userProcessor.UpdateUserProfile(ctx, userID, req) } func (s *UserServiceImpl) ListTitles(ctx context.Context) (*contract.ListTitlesResponse, error) { if s.titleRepo == nil { return &contract.ListTitlesResponse{Titles: []contract.TitleResponse{}}, nil } titles, err := s.titleRepo.ListAll(ctx) if err != nil { return nil, err } return &contract.ListTitlesResponse{Titles: transformer.TitlesToContract(titles)}, nil } // GetActiveUsersForMention retrieves active users for mention purposes func (s *UserServiceImpl) GetActiveUsersForMention(ctx context.Context, search *string, limit int) ([]contract.UserResponse, error) { return s.userProcessor.GetActiveUsersForMention(ctx, search, limit) }