package controller import ( "dcard-backend-2024/pkg/inmem" "dcard-backend-2024/pkg/model" "errors" "net/http" "github.com/gin-gonic/gin" ) type AdController struct { adService model.AdService } func NewAdController(adService model.AdService) *AdController { return &AdController{ adService: adService, } } // GetAd godoc // @Summary Get an ad by ID // @Description Retrieves an ad by ID // @Tags Ad // @Accept json // @Produce json // @Param offset query int false "Offset for pagination" // @Param limit query int false "Limit for pagination" // @Param age query int false "Age" // @Param gender query string false "Gender" // @Param country query string false "Country" // @Param platform query string false "Platform" // @Success 200 {object} model.GetAdsPageResponse // @Failure 404 {object} model.Response // @Failure 500 {object} model.Response // @Router /api/v1/ad [get] func (ac *AdController) GetAd(c *gin.Context) { var req model.GetAdRequest if err := c.BindQuery(&req); err != nil { c.JSON(http.StatusBadRequest, model.Response{Msg: err.Error()}) return } ads, total, err := ac.adService.GetAds(c, &req) switch { case errors.Is(err, inmem.ErrNoAdsFound): c.JSON(http.StatusNotFound, model.Response{Msg: err.Error()}) return case err != nil: c.JSON(http.StatusInternalServerError, model.Response{Msg: err.Error()}) return } c.JSON(http.StatusOK, model.GetAdsPageResponse{Ads: ads, Total: total}) } // CreateAd godoc // @Summary Create an ad // @Description Create an ad // @Tags Ad // @Accept json // @Produce json // @Param ad body model.CreateAdRequest true "Ad object" // @Success 201 {object} model.CreateAdResponse // @Failure 400 {object} model.Response // @Failure 500 {object} model.Response // @Router /api/v1/ad [post] func (ac *AdController) CreateAd(c *gin.Context) { var ad model.CreateAdRequest if err := c.BindJSON(&ad); err != nil { c.JSON(http.StatusBadRequest, model.Response{Msg: err.Error()}) return } adID, err := ac.adService.CreateAd(c, &model.Ad{ Title: ad.Title, Content: ad.Content, StartAt: ad.StartAt, EndAt: ad.EndAt, AgeStart: ad.AgeStart, AgeEnd: ad.AgeEnd, Gender: ad.Gender, Country: ad.Country, Platform: ad.Platform, }, ) if err != nil { c.JSON(http.StatusInternalServerError, model.Response{Msg: err.Error()}) return } c.JSON(http.StatusCreated, model.Response{Msg: "Ad created", Data: adID}) }
package dispatcher import ( "dcard-backend-2024/pkg/model" "dcard-backend-2024/pkg/syncmap" "log" "sync/atomic" "time" ) type Dispatcher struct { Running atomic.Bool RequestChan chan interface{} ResponseChan *syncmap.Map Store model.InMemoryStore } func (r *Dispatcher) IsRunning() bool { return r.Running.Load() } func NewDispatcher(store model.InMemoryStore) *Dispatcher { return &Dispatcher{ RequestChan: make(chan interface{}), ResponseChan: &syncmap.Map{}, Store: store, } } func (r *Dispatcher) handleCreateBatchAdRequest(req *CreateBatchAdRequest) { // err := r.Store.CreateBatchAds(req.Ads) for _, ad := range req.Ads { if time.Now().After(ad.StartAt.T()) { _, err := r.Store.CreateAd(ad) if err != nil { log.Printf("failed to create ad %s: %v", ad.ID, err) } } else { log.Printf("ad %s is scheduled to start at %s", ad.ID, ad.StartAt.T()) time.AfterFunc(time.Until(ad.StartAt.T()), func() { _, err := r.Store.CreateAd(ad) if err != nil { log.Printf("failed to create ad %s: %v", ad.ID, err) } else { log.Printf("scheduled ad %s is created", ad.ID) } }) } } // use sync map to store the response channel if r.ResponseChan.Exists(req.RequestID) { r.ResponseChan.Load(req.RequestID) <- &CreateAdResponse{ Response: Response{RequestID: req.RequestID}, Err: nil, } } } func (r *Dispatcher) handleCreateAdRequest(req *CreateAdRequest) { if time.Now().After(req.Ad.StartAt.T()) { _, err := r.Store.CreateAd(req.Ad) if err != nil { log.Printf("failed to create ad %s: %v", req.Ad.ID, err) } } else { log.Printf("ad %s is scheduled to start at %s", req.Ad.ID, req.Ad.StartAt.T()) time.AfterFunc(time.Until(req.Ad.StartAt.T()), func() { _, err := r.Store.CreateAd(req.Ad) if err != nil { log.Printf("failed to create ad %s: %v", req.Ad.ID, err) } else { log.Printf("scheduled ad %s is created", req.Ad.ID) } }) } if r.ResponseChan.Exists(req.RequestID) { r.ResponseChan.Load(req.RequestID) <- &CreateAdResponse{ Response: Response{RequestID: req.RequestID}, AdID: req.Ad.ID.String(), Err: nil, } } } func (r *Dispatcher) handleGetAdRequest(req *GetAdRequest) { ads, total, err := r.Store.GetAds(req.GetAdRequest) if r.ResponseChan.Exists(req.RequestID) { r.ResponseChan.Load(req.RequestID) <- &GetAdResponse{ Response: Response{RequestID: req.RequestID}, Ads: ads, Total: total, Err: err, } } } func (r *Dispatcher) handleDeleteAdRequest(req *DeleteAdRequest) { _ = r.Store.DeleteAd(req.AdID) // if r.ResponseChan.Exists(req.RequestID) { // r.ResponseChan.Load(req.RequestID) <- &DeleteAdResponse{ // Response: Response{RequestID: req.RequestID}, // Err: err, // } // } } func (r *Dispatcher) Start() { r.Running.Store(true) log.Println("Dispatcher started") for { select { case req := <-r.RequestChan: switch req.(type) { case *CreateBatchAdRequest: r.handleCreateBatchAdRequest(req.(*CreateBatchAdRequest)) case *CreateAdRequest: // the create ad request is from the redis stream r.handleCreateAdRequest(req.(*CreateAdRequest)) case *GetAdRequest: go r.handleGetAdRequest(req.(*GetAdRequest)) case *DeleteAdRequest: r.handleDeleteAdRequest(req.(*DeleteAdRequest)) } } } }
package dispatcher import ( "dcard-backend-2024/pkg/model" "encoding/json" ) type IRequest interface { RequestUID() string } type Request struct { IRequest RequestID string `json:"request_id"` } func (r *Request) RequestUID() string { return r.RequestID } type IResult interface { Error() error } type Response struct { RequestID string `json:"request_id"` } type CreateAdRequest struct { Request *model.Ad } type CreateBatchAdRequest struct { Request Ads []*model.Ad } func (r *CreateAdRequest) ToMap() (map[string]interface{}, error) { jsonData, err := json.Marshal(r) if err != nil { return nil, err } var result map[string]interface{} if err := json.Unmarshal(jsonData, &result); err != nil { return nil, err } return result, nil } func (r *CreateAdRequest) FromMap(m map[string]interface{}) error { jsonData, err := json.Marshal(m) if err != nil { return err } if err := json.Unmarshal(jsonData, r); err != nil { return err } return nil } type CreateAdResponse struct { IResult Response AdID string Err error } func (r *CreateAdResponse) Error() error { return r.Err } type GetAdRequest struct { Request *model.GetAdRequest } type GetAdResponse struct { IResult Response Ads []*model.Ad Total int Err error } func (r *GetAdResponse) Error() error { return r.Err } type DeleteAdRequest struct { Request AdID string } type DeleteAdResponse struct { IResult Response Err error } func (r *DeleteAdResponse) Error() error { return r.Err }
package inmem import ( "dcard-backend-2024/pkg/model" "fmt" "log" "sync" cmap "github.com/orcaman/concurrent-map/v2" "github.com/wangjia184/sortedset" ) type IndexNode interface { AddAd(ad *model.Ad) GetAd(req *model.GetAdRequest) ([]*model.Ad, error) DeleteAd(ad *model.Ad) } type FieldStringer struct { Value interface{} } func (f FieldStringer) String() string { return fmt.Sprintf("%v", f.Value) } func (g *IndexInternalNode) AddAd(ad *model.Ad) { values, err := ad.GetValueByKey(g.Key) if err != nil { log.Printf("AddAd: Error getting value by key \"%s\": %s\n", g.Key, err) return } var wg sync.WaitGroup for _, v := range values { wg.Add(1) go func(v interface{}) { defer wg.Done() field := FieldStringer{Value: v} child, exists := g.Children.Get(field) if !exists { nextKey := model.Ad{}.GetNextIndexKey(g.Key) if nextKey == "" { child = NewIndexLeafNode() } else { child = NewIndexInternalNode(nextKey) } g.Children.Set(field, child) } child.AddAd(ad) }(v) } wg.Wait() } // GetAd implements IndexNode. func (g *IndexInternalNode) GetAd(req *model.GetAdRequest) ([]*model.Ad, error) { values, err := req.GetValueByKey(g.Key) if err != nil { return nil, fmt.Errorf("GetAd: Error getting value by key \"%s\": %s", g.Key, err) } Field := FieldStringer{Value: values} child, exists := g.Children.Get(Field) if !exists { return nil, nil } ads, err := child.GetAd(req) return ads, nil } // DeleteAd implements IndexNode. func (g *IndexInternalNode) DeleteAd(ad *model.Ad) { values, err := ad.GetValueByKey(g.Key) if err != nil { log.Printf("Error getting value by key \"%s\": %s\n", g.Key, err) return } for _, v := range values { field := FieldStringer{Value: v} child, exists := g.Children.Get(field) if !exists { continue } child.DeleteAd(ad) } } type IndexInternalNode struct { Key string // The key this node indexes on, e.g., "country", "age" Children cmap.ConcurrentMap[FieldStringer, IndexNode] // The children of this node } func NewIndexInternalNode(key string) IndexNode { return &IndexInternalNode{ Key: key, Children: cmap.NewStringer[FieldStringer, IndexNode](), } } type IndexLeafNode struct { mu sync.RWMutex Ads *sortedset.SortedSet // map[string]*model.Ad } func (g *IndexLeafNode) AddAd(ad *model.Ad) { g.mu.Lock() defer g.mu.Unlock() g.Ads.AddOrUpdate(ad.ID.String(), sortedset.SCORE(ad.CreatedAt.T().Unix()), ad) } // GetAd implements IndexNode. func (g *IndexLeafNode) GetAd(req *model.GetAdRequest) ([]*model.Ad, error) { g.mu.RLock() defer g.mu.RUnlock() ad := g.Ads.GetByRankRange(req.Offset, req.Offset+req.Limit, false) ret := make([]*model.Ad, len(ad)) for i, a := range ad { ret[i] = a.Value.(*model.Ad) } return ret, nil } // DeleteAd implements IndexNode. func (g *IndexLeafNode) DeleteAd(ad *model.Ad) { g.mu.Lock() defer g.mu.Unlock() g.Ads.Remove(ad.ID.String()) } func NewIndexLeafNode() IndexNode { return &IndexLeafNode{ Ads: sortedset.New(), } }
package inmem import ( "dcard-backend-2024/pkg/model" "fmt" "sync" ) var ( // ErrNoAdsFound is returned when the ad is not found in the store, 404 ErrNoAdsFound error = fmt.Errorf("no ads found") // ErrOffsetOutOfRange is returned when the offset is out of range, 404 ErrOffsetOutOfRange error = fmt.Errorf("offset is out of range") // ErrInvalidVersion is returned when the version is invalid, inconsistent with the store ErrInvalidVersion error = fmt.Errorf("invalid version") ) // InMemoryStoreImpl is an in-memory ad store implementation type InMemoryStoreImpl struct { // ads maps ad IDs to ads ads map[string]*model.Ad adIndexRoot IndexNode mutex sync.RWMutex } func NewInMemoryStore() model.InMemoryStore { return &InMemoryStoreImpl{ ads: make(map[string]*model.Ad), adIndexRoot: NewIndexInternalNode(model.Ad{}.GetNextIndexKey("")), mutex: sync.RWMutex{}, } } // CreateBatchAds creates a batch of ads in the store // (only used in the snapshot restore) func (s *InMemoryStoreImpl) CreateBatchAds(ads []*model.Ad) (err error) { s.mutex.Lock() defer s.mutex.Unlock() for _, ad := range ads { s.ads[ad.ID.String()] = ad s.adIndexRoot.AddAd(ad) } return nil } func (s *InMemoryStoreImpl) CreateAd(ad *model.Ad) (string, error) { s.mutex.Lock() defer s.mutex.Unlock() s.ads[ad.ID.String()] = ad s.adIndexRoot.AddAd(ad) return ad.ID.String(), nil } func (s *InMemoryStoreImpl) GetAds(req *model.GetAdRequest) (ads []*model.Ad, count int, err error) { s.mutex.RLock() defer s.mutex.RUnlock() ads, err = s.adIndexRoot.GetAd(req) if err != nil { return nil, 0, err } return ads, len(ads), nil } // DeleteAd implements model.InMemoryStore. func (s *InMemoryStoreImpl) DeleteAd(adID string) error { s.mutex.Lock() defer s.mutex.Unlock() s.adIndexRoot.DeleteAd(s.ads[adID]) return nil }
package service import ( "context" "database/sql" "dcard-backend-2024/pkg/dispatcher" "dcard-backend-2024/pkg/model" "encoding/json" "fmt" "log" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/bsm/redislock" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" "gorm.io/gorm" ) var ( ErrTimeout = fmt.Errorf("timeout") ErrUnknown = fmt.Errorf("unknown error") ) type AdService struct { shutdown atomic.Bool dispatcher *dispatcher.Dispatcher db *gorm.DB redis *redis.Client locker *redislock.Client asynqClient *asynq.Client lockKey string // adStream is the redis stream name for the ad adStream string mu sync.Mutex wg sync.WaitGroup onShutdown []func() Version int // Version is the latest version of the ad } // DeleteAd implements model.AdService. func (a *AdService) DeleteAd(ctx context.Context, adID string) error { ctx = context.Background() // RedisLock Lock Key: lock:ad lock, err := a.locker.Obtain(ctx, a.lockKey, 100*time.Millisecond, &redislock.Options{ RetryStrategy: redislock.LimitRetry(redislock.ExponentialBackoff(1*time.Millisecond, 5*time.Millisecond), 10), }) if err != nil { log.Printf("error obtaining lock: %v", err) return err } defer func() { // Release Lock err := lock.Release(ctx) if err != nil { log.Printf("error releasing lock: %v", err) } }() // Begin Transaction // UPDATE ads SET is_active = false AND version = `SELECT MAX(version) FROM ads` + 1 WHERE id = adID // DELETE FROM ads WHERE version < `SELECT MAX(version) FROM ads` AND is_active = false // Commit Transaction txn := a.db.Begin() if err = txn.Error; err != nil { return err } var maxVersion int if err = txn.Raw("SELECT COALESCE(MAX(version), 0) FROM ads").Scan(&maxVersion).Error; err != nil { txn.Rollback() return err } maxVersion++ if err = txn.Model(&model.Ad{}).Where("id = ?", adID). Update("is_active", false). Update("version", maxVersion).Error; err != nil { txn.Rollback() return err } if err = txn.Delete(&model.Ad{}, "version < ? AND is_active = false", maxVersion).Error; err != nil { txn.Rollback() return err } err = txn.Commit().Error if err != nil { return err } adReqMapStr, err := json.Marshal(dispatcher.DeleteAdRequest{AdID: adID}) if err != nil { log.Printf("error marshalling ad request: %v", err) return err } // Publish to Redis Stream // XADD ad 0-`SELECT MAX(version) FROM ads` {"ad": "adReqJsonStr"} _, err = a.redis.XAdd(ctx, &redis.XAddArgs{ Stream: a.adStream, NoMkStream: false, Approx: false, MaxLen: 100000, Values: []interface{}{ "ad", string(adReqMapStr), "type", "delete", }, ID: fmt.Sprintf("0-%d", maxVersion), }).Result() if err != nil { log.Printf("error publishing to redis: %v", err) return err } return nil } // Shutdown implements model.AdService. func (a *AdService) Shutdown(ctx context.Context) error { a.shutdown.Store(true) done := make(chan struct{}) a.mu.Lock() for _, f := range a.onShutdown { go f() } a.mu.Unlock() go func() { a.wg.Wait() close(done) }() select { case <-done: return nil case <-ctx.Done(): return ctx.Err() } } // Run implements model.AdService. func (a *AdService) Run() error { go a.dispatcher.Start() // Start the dispatcher stopCh := make(chan struct{}, 1) a.wg.Add(1) defer a.wg.Done() a.registerOnShutdown(func() { close(stopCh) }) operation := func() error { err := a.Restore() if err != nil { log.Printf("error restoring: %v", err) return err } err = a.Subscribe() if err != nil { log.Printf("error subscribing: %v", err) return err } return nil } maxRetry := 5 operationBackoff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry)) for a.shutdown.Load() == false { select { case <-stopCh: return nil default: err := backoff.Retry(operation, operationBackoff) if err != nil { log.Printf("error running: %v", err) return err } } } return nil } // Restore restores the latest version of an ad from the database. // It returns the version number of the restored ad and any error encountered. // The error could be ErrRecordNotFound if no ad is found or a DB connection error. func (a *AdService) Restore() (err error) { txn := a.db.Begin(&sql.TxOptions{Isolation: sql.LevelRepeatableRead}) err = txn.Raw("SELECT COALESCE(MAX(version), 0) FROM ads").Scan(&a.Version).Error if err != nil { return err } var ads []*model.Ad err = txn.Where("is_active = ?", true).Find(&ads).Error if err != nil { return err } err = txn.Commit().Error if err != nil { return err } requestID := uuid.New().String() a.dispatcher.ResponseChan.Store(requestID, make(chan interface{}, 1)) defer a.dispatcher.ResponseChan.Delete(requestID) a.dispatcher.RequestChan <- &dispatcher.CreateBatchAdRequest{ Request: dispatcher.Request{RequestID: requestID}, Ads: ads, } select { case resp := <-a.dispatcher.ResponseChan.Load(requestID): if resp, ok := resp.(*dispatcher.CreateAdResponse); ok { if resp.Err == nil { log.Printf("Restored version: %d successfully\n", a.Version) } return resp.Err } case <-time.After(10 * time.Second): return ErrTimeout } return ErrUnknown } // Subscribe implements model.AdService. func (a *AdService) Subscribe() error { log.Printf("subscribing to redis stream with offset: %d", a.Version) ctx := context.Background() lastID := fmt.Sprintf("0-%d", a.Version) // Assuming offset can be mapped directly to Redis Stream IDs stopCh := make(chan struct{}, 1) a.wg.Add(1) defer a.wg.Done() a.registerOnShutdown(func() { close(stopCh) }) for a.shutdown.Load() == false { select { case <-stopCh: return nil default: // Reading from the stream xReadArgs := &redis.XReadArgs{ Streams: []string{a.adStream, lastID}, Block: 3 * time.Second, Count: 10, } msgs, err := a.redis.XRead(ctx, xReadArgs).Result() if err != nil { // log.Printf("error reading from redis: %v", err) continue } for _, msg := range msgs { for _, m := range msg.Messages { log.Printf("received message: %v\n", m) streamVersion, _ := strconv.ParseInt(strings.Split(m.ID, "-")[1], 10, 64) if a.Version < int(streamVersion) { a.Version = int(streamVersion) lastID = m.ID } else { // our version is the same or higher than the stream version continue } switch m.Values["type"].(string) { case "create": payload := &dispatcher.CreateAdRequest{} json.Unmarshal([]byte(m.Values["ad"].(string)), payload) a.dispatcher.RequestChan <- payload case "delete": payload := &dispatcher.DeleteAdRequest{} json.Unmarshal([]byte(m.Values["ad"].(string)), payload) a.dispatcher.RequestChan <- payload default: log.Printf("unknown message type: %s", m.Values["type"].(string)) } } } } } return nil } func (a *AdService) registerOnShutdown(f func()) { a.mu.Lock() a.onShutdown = append(a.onShutdown, f) a.mu.Unlock() } func (a *AdService) onShutdownNum() int { a.mu.Lock() defer a.mu.Unlock() return len(a.onShutdown) } // storeAndPublishWithLock // // 1. locks the lockKey // // 2. stores the ad in the database, and set the version of the new ad to `SELECT MAX(version) FROM ad“ + 1 // // 3. publishes the ad into redis stream. (ensure the message sequence number is the same as the ad's version) // // 4. releases the lock func (a *AdService) storeAndPublishWithLock(ctx context.Context, ad *model.Ad, requestID string) (err error) { ctx = context.Background() lock, err := a.locker.Obtain(ctx, a.lockKey, 100*time.Millisecond, &redislock.Options{ RetryStrategy: redislock.LimitRetry(redislock.ExponentialBackoff(1*time.Millisecond, 5*time.Millisecond), 10), }) if err != nil { log.Printf("error obtaining lock: %v", err) return } defer func() { err := lock.Release(ctx) if err != nil { log.Printf("error releasing lock: %v", err) } }() txn := a.db.Begin() if err = txn.Error; err != nil { return } var maxVersion int if err = txn.Raw("SELECT COALESCE(MAX(version), 0) FROM ads").Scan(&maxVersion).Error; err != nil { txn.Rollback() return } ad.Version = maxVersion + 1 if err = txn.Create(ad).Error; err != nil { txn.Rollback() return } err = txn.Commit().Error if err != nil { return } adReq := &dispatcher.CreateAdRequest{ Request: dispatcher.Request{RequestID: requestID}, Ad: ad, } // adReqMap, err := adReq.ToMap() adReqMapStr, err := json.Marshal(adReq) // return requestID, nil // log.Printf("adReqJsonStr: %s", adReqJsonStr) if err != nil { log.Printf("error marshalling ad request: %v", err) return } _, err = a.redis.XAdd(ctx, &redis.XAddArgs{ Stream: a.adStream, NoMkStream: false, Approx: false, MaxLen: 100000, Values: []interface{}{ "ad", string(adReqMapStr), "type", "create", }, ID: fmt.Sprintf("0-%d", ad.Version), }).Result() if err != nil { log.Printf("error publishing to redis: %v", err) return } return nil } // CreateAd implements model.AdService. func (a *AdService) CreateAd(ctx context.Context, ad *model.Ad) (adID string, err error) { a.wg.Add(1) defer a.wg.Done() requestID := uuid.New().String() a.dispatcher.ResponseChan.Store(requestID, make(chan interface{}, 1)) defer a.dispatcher.ResponseChan.Delete(requestID) err = a.storeAndPublishWithLock(ctx, ad, requestID) if err != nil { return "", err } err = a.registerAdDeleteTask(ad) if err != nil { return "", err } select { case resp := <-a.dispatcher.ResponseChan.Load(requestID): if resp, ok := resp.(*dispatcher.CreateAdResponse); ok { return resp.AdID, resp.Err } case <-time.After(3 * time.Second): return "", ErrTimeout } return "", ErrUnknown } func (a *AdService) registerAdDeleteTask(ad *model.Ad) error { payload := &model.AsynqDeletePayload{AdID: ad.ID.String()} task, err := payload.ToTask() if err != nil { return err } taskID := fmt.Sprintf("%s-%s", payload.TypeName(), ad.ID.String()) processTime := ad.EndAt.T() _, err = a.asynqClient.Enqueue( task, asynq.ProcessAt(processTime), asynq.TaskID(taskID), ) return err } // GetAds implements model.AdService. func (a *AdService) GetAds(ctx context.Context, req *model.GetAdRequest) ([]*model.Ad, int, error) { a.wg.Add(1) defer a.wg.Done() requestID := uuid.New().String() a.dispatcher.ResponseChan.Store(requestID, make(chan interface{}, 1)) defer a.dispatcher.ResponseChan.Delete(requestID) a.dispatcher.RequestChan <- &dispatcher.GetAdRequest{ Request: dispatcher.Request{RequestID: requestID}, GetAdRequest: req, } select { case resp := <-a.dispatcher.ResponseChan.Load(requestID): if resp, ok := resp.(*dispatcher.GetAdResponse); ok { return resp.Ads, resp.Total, resp.Err } case <-time.After(3 * time.Second): return nil, 0, ErrTimeout } return nil, 0, ErrUnknown } func NewAdService(dispatcher *dispatcher.Dispatcher, db *gorm.DB, redis *redis.Client, locker *redislock.Client, asynqClient *asynq.Client) model.AdService { return &AdService{ dispatcher: dispatcher, db: db, redis: redis, locker: locker, lockKey: "lock:ad", onShutdown: make([]func(), 0), adStream: "ad", asynqClient: asynqClient, shutdown: atomic.Bool{}, Version: 0, } }
package service import ( "context" "dcard-backend-2024/pkg/model" "encoding/json" "github.com/hibiken/asynq" ) type TaskService struct { adService model.AdService } // HandleDeleteAd implements model.TaskService. func (svc *TaskService) HandleDeleteAd(ctx context.Context, t *asynq.Task) error { var deletePayload model.AsynqDeletePayload if err := json.Unmarshal(t.Payload(), &deletePayload); err != nil { return err } return svc.adService.DeleteAd(ctx, deletePayload.AdID) } func (svc *TaskService) RegisterTaskHandler(mux *asynq.ServeMux) { mux.HandleFunc(model.AsynqDeletePayload{}.TypeName(), svc.HandleDeleteAd) } func NewTaskService(adService model.AdService) model.TaskService { return &TaskService{ adService: adService, } }
package syncmap import "sync" type Map struct { syncMap sync.Map } func (m *Map) LoadOrStore(key string, value chan interface{}) (chan interface{}, bool) { val, loaded := m.syncMap.LoadOrStore(key, value) return val.(chan interface{}), loaded } func (m *Map) Load(key string) chan interface{} { val, ok := m.syncMap.Load(key) if ok { return val.(chan interface{}) } else { return nil } } func (m *Map) Exists(key string) bool { _, ok := m.syncMap.Load(key) return ok } func (m *Map) Store(key string, value chan interface{}) { m.syncMap.Store(key, value) } func (m *Map) Delete(key string) { m.syncMap.Delete(key) }