package controller
import (
"dcard-backend-2024/pkg/inmem"
"dcard-backend-2024/pkg/model"
"errors"
"net/http"
"github.com/gin-gonic/gin"
)
type AdController struct {
adService model.AdService
}
func NewAdController(adService model.AdService) *AdController {
return &AdController{
adService: adService,
}
}
// GetAd godoc
// @Summary Get an ad by ID
// @Description Retrieves an ad by ID
// @Tags Ad
// @Accept json
// @Produce json
// @Param offset query int false "Offset for pagination"
// @Param limit query int false "Limit for pagination"
// @Param age query int false "Age"
// @Param gender query string false "Gender"
// @Param country query string false "Country"
// @Param platform query string false "Platform"
// @Success 200 {object} model.GetAdsPageResponse
// @Failure 404 {object} model.Response
// @Failure 500 {object} model.Response
// @Router /api/v1/ad [get]
func (ac *AdController) GetAd(c *gin.Context) {
var req model.GetAdRequest
if err := c.BindQuery(&req); err != nil {
c.JSON(http.StatusBadRequest, model.Response{Msg: err.Error()})
return
}
ads, total, err := ac.adService.GetAds(c, &req)
switch {
case errors.Is(err, inmem.ErrNoAdsFound):
c.JSON(http.StatusNotFound, model.Response{Msg: err.Error()})
return
case err != nil:
c.JSON(http.StatusInternalServerError, model.Response{Msg: err.Error()})
return
}
c.JSON(http.StatusOK, model.GetAdsPageResponse{Ads: ads, Total: total})
}
// CreateAd godoc
// @Summary Create an ad
// @Description Create an ad
// @Tags Ad
// @Accept json
// @Produce json
// @Param ad body model.CreateAdRequest true "Ad object"
// @Success 201 {object} model.CreateAdResponse
// @Failure 400 {object} model.Response
// @Failure 500 {object} model.Response
// @Router /api/v1/ad [post]
func (ac *AdController) CreateAd(c *gin.Context) {
var ad model.CreateAdRequest
if err := c.BindJSON(&ad); err != nil {
c.JSON(http.StatusBadRequest, model.Response{Msg: err.Error()})
return
}
adID, err := ac.adService.CreateAd(c,
&model.Ad{
Title: ad.Title,
Content: ad.Content,
StartAt: ad.StartAt,
EndAt: ad.EndAt,
AgeStart: ad.AgeStart,
AgeEnd: ad.AgeEnd,
Gender: ad.Gender,
Country: ad.Country,
Platform: ad.Platform,
},
)
if err != nil {
c.JSON(http.StatusInternalServerError, model.Response{Msg: err.Error()})
return
}
c.JSON(http.StatusCreated, model.Response{Msg: "Ad created", Data: adID})
}
package dispatcher
import (
"dcard-backend-2024/pkg/model"
"dcard-backend-2024/pkg/syncmap"
"log"
"sync/atomic"
"time"
)
type Dispatcher struct {
Running atomic.Bool
RequestChan chan interface{}
ResponseChan *syncmap.Map
Store model.InMemoryStore
}
func (r *Dispatcher) IsRunning() bool {
return r.Running.Load()
}
func NewDispatcher(store model.InMemoryStore) *Dispatcher {
return &Dispatcher{
RequestChan: make(chan interface{}),
ResponseChan: &syncmap.Map{},
Store: store,
}
}
func (r *Dispatcher) handleCreateBatchAdRequest(req *CreateBatchAdRequest) {
// err := r.Store.CreateBatchAds(req.Ads)
for _, ad := range req.Ads {
if time.Now().After(ad.StartAt.T()) {
_, err := r.Store.CreateAd(ad)
if err != nil {
log.Printf("failed to create ad %s: %v", ad.ID, err)
}
} else {
log.Printf("ad %s is scheduled to start at %s", ad.ID, ad.StartAt.T())
time.AfterFunc(time.Until(ad.StartAt.T()), func() {
_, err := r.Store.CreateAd(ad)
if err != nil {
log.Printf("failed to create ad %s: %v", ad.ID, err)
} else {
log.Printf("scheduled ad %s is created", ad.ID)
}
})
}
}
// use sync map to store the response channel
if r.ResponseChan.Exists(req.RequestID) {
r.ResponseChan.Load(req.RequestID) <- &CreateAdResponse{
Response: Response{RequestID: req.RequestID},
Err: nil,
}
}
}
func (r *Dispatcher) handleCreateAdRequest(req *CreateAdRequest) {
if time.Now().After(req.Ad.StartAt.T()) {
_, err := r.Store.CreateAd(req.Ad)
if err != nil {
log.Printf("failed to create ad %s: %v", req.Ad.ID, err)
}
} else {
log.Printf("ad %s is scheduled to start at %s", req.Ad.ID, req.Ad.StartAt.T())
time.AfterFunc(time.Until(req.Ad.StartAt.T()), func() {
_, err := r.Store.CreateAd(req.Ad)
if err != nil {
log.Printf("failed to create ad %s: %v", req.Ad.ID, err)
} else {
log.Printf("scheduled ad %s is created", req.Ad.ID)
}
})
}
if r.ResponseChan.Exists(req.RequestID) {
r.ResponseChan.Load(req.RequestID) <- &CreateAdResponse{
Response: Response{RequestID: req.RequestID},
AdID: req.Ad.ID.String(),
Err: nil,
}
}
}
func (r *Dispatcher) handleGetAdRequest(req *GetAdRequest) {
ads, total, err := r.Store.GetAds(req.GetAdRequest)
if r.ResponseChan.Exists(req.RequestID) {
r.ResponseChan.Load(req.RequestID) <- &GetAdResponse{
Response: Response{RequestID: req.RequestID},
Ads: ads,
Total: total,
Err: err,
}
}
}
func (r *Dispatcher) handleDeleteAdRequest(req *DeleteAdRequest) {
_ = r.Store.DeleteAd(req.AdID)
// if r.ResponseChan.Exists(req.RequestID) {
// r.ResponseChan.Load(req.RequestID) <- &DeleteAdResponse{
// Response: Response{RequestID: req.RequestID},
// Err: err,
// }
// }
}
func (r *Dispatcher) Start() {
r.Running.Store(true)
log.Println("Dispatcher started")
for {
select {
case req := <-r.RequestChan:
switch req.(type) {
case *CreateBatchAdRequest:
r.handleCreateBatchAdRequest(req.(*CreateBatchAdRequest))
case *CreateAdRequest:
// the create ad request is from the redis stream
r.handleCreateAdRequest(req.(*CreateAdRequest))
case *GetAdRequest:
go r.handleGetAdRequest(req.(*GetAdRequest))
case *DeleteAdRequest:
r.handleDeleteAdRequest(req.(*DeleteAdRequest))
}
}
}
}
package dispatcher
import (
"dcard-backend-2024/pkg/model"
"encoding/json"
)
type IRequest interface {
RequestUID() string
}
type Request struct {
IRequest
RequestID string `json:"request_id"`
}
func (r *Request) RequestUID() string {
return r.RequestID
}
type IResult interface {
Error() error
}
type Response struct {
RequestID string `json:"request_id"`
}
type CreateAdRequest struct {
Request
*model.Ad
}
type CreateBatchAdRequest struct {
Request
Ads []*model.Ad
}
func (r *CreateAdRequest) ToMap() (map[string]interface{}, error) {
jsonData, err := json.Marshal(r)
if err != nil {
return nil, err
}
var result map[string]interface{}
if err := json.Unmarshal(jsonData, &result); err != nil {
return nil, err
}
return result, nil
}
func (r *CreateAdRequest) FromMap(m map[string]interface{}) error {
jsonData, err := json.Marshal(m)
if err != nil {
return err
}
if err := json.Unmarshal(jsonData, r); err != nil {
return err
}
return nil
}
type CreateAdResponse struct {
IResult
Response
AdID string
Err error
}
func (r *CreateAdResponse) Error() error {
return r.Err
}
type GetAdRequest struct {
Request
*model.GetAdRequest
}
type GetAdResponse struct {
IResult
Response
Ads []*model.Ad
Total int
Err error
}
func (r *GetAdResponse) Error() error {
return r.Err
}
type DeleteAdRequest struct {
Request
AdID string
}
type DeleteAdResponse struct {
IResult
Response
Err error
}
func (r *DeleteAdResponse) Error() error {
return r.Err
}
package inmem
import (
"dcard-backend-2024/pkg/model"
"fmt"
"log"
"sync"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/wangjia184/sortedset"
)
type IndexNode interface {
AddAd(ad *model.Ad)
GetAd(req *model.GetAdRequest) ([]*model.Ad, error)
DeleteAd(ad *model.Ad)
}
type FieldStringer struct {
Value interface{}
}
func (f FieldStringer) String() string {
return fmt.Sprintf("%v", f.Value)
}
func (g *IndexInternalNode) AddAd(ad *model.Ad) {
values, err := ad.GetValueByKey(g.Key)
if err != nil {
log.Printf("AddAd: Error getting value by key \"%s\": %s\n", g.Key, err)
return
}
var wg sync.WaitGroup
for _, v := range values {
wg.Add(1)
go func(v interface{}) {
defer wg.Done()
field := FieldStringer{Value: v}
child, exists := g.Children.Get(field)
if !exists {
nextKey := model.Ad{}.GetNextIndexKey(g.Key)
if nextKey == "" {
child = NewIndexLeafNode()
} else {
child = NewIndexInternalNode(nextKey)
}
g.Children.Set(field, child)
}
child.AddAd(ad)
}(v)
}
wg.Wait()
}
// GetAd implements IndexNode.
func (g *IndexInternalNode) GetAd(req *model.GetAdRequest) ([]*model.Ad, error) {
values, err := req.GetValueByKey(g.Key)
if err != nil {
return nil, fmt.Errorf("GetAd: Error getting value by key \"%s\": %s", g.Key, err)
}
Field := FieldStringer{Value: values}
child, exists := g.Children.Get(Field)
if !exists {
return nil, nil
}
ads, err := child.GetAd(req)
return ads, nil
}
// DeleteAd implements IndexNode.
func (g *IndexInternalNode) DeleteAd(ad *model.Ad) {
values, err := ad.GetValueByKey(g.Key)
if err != nil {
log.Printf("Error getting value by key \"%s\": %s\n", g.Key, err)
return
}
for _, v := range values {
field := FieldStringer{Value: v}
child, exists := g.Children.Get(field)
if !exists {
continue
}
child.DeleteAd(ad)
}
}
type IndexInternalNode struct {
Key string // The key this node indexes on, e.g., "country", "age"
Children cmap.ConcurrentMap[FieldStringer, IndexNode] // The children of this node
}
func NewIndexInternalNode(key string) IndexNode {
return &IndexInternalNode{
Key: key,
Children: cmap.NewStringer[FieldStringer, IndexNode](),
}
}
type IndexLeafNode struct {
mu sync.RWMutex
Ads *sortedset.SortedSet // map[string]*model.Ad
}
func (g *IndexLeafNode) AddAd(ad *model.Ad) {
g.mu.Lock()
defer g.mu.Unlock()
g.Ads.AddOrUpdate(ad.ID.String(), sortedset.SCORE(ad.CreatedAt.T().Unix()), ad)
}
// GetAd implements IndexNode.
func (g *IndexLeafNode) GetAd(req *model.GetAdRequest) ([]*model.Ad, error) {
g.mu.RLock()
defer g.mu.RUnlock()
ad := g.Ads.GetByRankRange(req.Offset, req.Offset+req.Limit, false)
ret := make([]*model.Ad, len(ad))
for i, a := range ad {
ret[i] = a.Value.(*model.Ad)
}
return ret, nil
}
// DeleteAd implements IndexNode.
func (g *IndexLeafNode) DeleteAd(ad *model.Ad) {
g.mu.Lock()
defer g.mu.Unlock()
g.Ads.Remove(ad.ID.String())
}
func NewIndexLeafNode() IndexNode {
return &IndexLeafNode{
Ads: sortedset.New(),
}
}
package inmem
import (
"dcard-backend-2024/pkg/model"
"fmt"
"sync"
)
var (
// ErrNoAdsFound is returned when the ad is not found in the store, 404
ErrNoAdsFound error = fmt.Errorf("no ads found")
// ErrOffsetOutOfRange is returned when the offset is out of range, 404
ErrOffsetOutOfRange error = fmt.Errorf("offset is out of range")
// ErrInvalidVersion is returned when the version is invalid, inconsistent with the store
ErrInvalidVersion error = fmt.Errorf("invalid version")
)
// InMemoryStoreImpl is an in-memory ad store implementation
type InMemoryStoreImpl struct {
// ads maps ad IDs to ads
ads map[string]*model.Ad
adIndexRoot IndexNode
mutex sync.RWMutex
}
func NewInMemoryStore() model.InMemoryStore {
return &InMemoryStoreImpl{
ads: make(map[string]*model.Ad),
adIndexRoot: NewIndexInternalNode(model.Ad{}.GetNextIndexKey("")),
mutex: sync.RWMutex{},
}
}
// CreateBatchAds creates a batch of ads in the store
// (only used in the snapshot restore)
func (s *InMemoryStoreImpl) CreateBatchAds(ads []*model.Ad) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, ad := range ads {
s.ads[ad.ID.String()] = ad
s.adIndexRoot.AddAd(ad)
}
return nil
}
func (s *InMemoryStoreImpl) CreateAd(ad *model.Ad) (string, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.ads[ad.ID.String()] = ad
s.adIndexRoot.AddAd(ad)
return ad.ID.String(), nil
}
func (s *InMemoryStoreImpl) GetAds(req *model.GetAdRequest) (ads []*model.Ad, count int, err error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
ads, err = s.adIndexRoot.GetAd(req)
if err != nil {
return nil, 0, err
}
return ads, len(ads), nil
}
// DeleteAd implements model.InMemoryStore.
func (s *InMemoryStoreImpl) DeleteAd(adID string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.adIndexRoot.DeleteAd(s.ads[adID])
return nil
}
package service
import (
"context"
"database/sql"
"dcard-backend-2024/pkg/dispatcher"
"dcard-backend-2024/pkg/model"
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/bsm/redislock"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
var (
ErrTimeout = fmt.Errorf("timeout")
ErrUnknown = fmt.Errorf("unknown error")
)
type AdService struct {
shutdown atomic.Bool
dispatcher *dispatcher.Dispatcher
db *gorm.DB
redis *redis.Client
locker *redislock.Client
asynqClient *asynq.Client
lockKey string
// adStream is the redis stream name for the ad
adStream string
mu sync.Mutex
wg sync.WaitGroup
onShutdown []func()
Version int // Version is the latest version of the ad
}
// DeleteAd implements model.AdService.
func (a *AdService) DeleteAd(ctx context.Context, adID string) error {
ctx = context.Background()
// RedisLock Lock Key: lock:ad
lock, err := a.locker.Obtain(ctx, a.lockKey, 100*time.Millisecond, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.ExponentialBackoff(1*time.Millisecond, 5*time.Millisecond), 10),
})
if err != nil {
log.Printf("error obtaining lock: %v", err)
return err
}
defer func() {
// Release Lock
err := lock.Release(ctx)
if err != nil {
log.Printf("error releasing lock: %v", err)
}
}()
// Begin Transaction
// UPDATE ads SET is_active = false AND version = `SELECT MAX(version) FROM ads` + 1 WHERE id = adID
// DELETE FROM ads WHERE version < `SELECT MAX(version) FROM ads` AND is_active = false
// Commit Transaction
txn := a.db.Begin()
if err = txn.Error; err != nil {
return err
}
var maxVersion int
if err = txn.Raw("SELECT COALESCE(MAX(version), 0) FROM ads").Scan(&maxVersion).Error; err != nil {
txn.Rollback()
return err
}
maxVersion++
if err = txn.Model(&model.Ad{}).Where("id = ?", adID).
Update("is_active", false).
Update("version", maxVersion).Error; err != nil {
txn.Rollback()
return err
}
if err = txn.Delete(&model.Ad{}, "version < ? AND is_active = false", maxVersion).Error; err != nil {
txn.Rollback()
return err
}
err = txn.Commit().Error
if err != nil {
return err
}
adReqMapStr, err := json.Marshal(dispatcher.DeleteAdRequest{AdID: adID})
if err != nil {
log.Printf("error marshalling ad request: %v", err)
return err
}
// Publish to Redis Stream
// XADD ad 0-`SELECT MAX(version) FROM ads` {"ad": "adReqJsonStr"}
_, err = a.redis.XAdd(ctx, &redis.XAddArgs{
Stream: a.adStream,
NoMkStream: false,
Approx: false,
MaxLen: 100000,
Values: []interface{}{
"ad", string(adReqMapStr),
"type", "delete",
},
ID: fmt.Sprintf("0-%d", maxVersion),
}).Result()
if err != nil {
log.Printf("error publishing to redis: %v", err)
return err
}
return nil
}
// Shutdown implements model.AdService.
func (a *AdService) Shutdown(ctx context.Context) error {
a.shutdown.Store(true)
done := make(chan struct{})
a.mu.Lock()
for _, f := range a.onShutdown {
go f()
}
a.mu.Unlock()
go func() {
a.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Run implements model.AdService.
func (a *AdService) Run() error {
go a.dispatcher.Start() // Start the dispatcher
stopCh := make(chan struct{}, 1)
a.wg.Add(1)
defer a.wg.Done()
a.registerOnShutdown(func() {
close(stopCh)
})
operation := func() error {
err := a.Restore()
if err != nil {
log.Printf("error restoring: %v", err)
return err
}
err = a.Subscribe()
if err != nil {
log.Printf("error subscribing: %v", err)
return err
}
return nil
}
maxRetry := 5
operationBackoff := backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry))
for a.shutdown.Load() == false {
select {
case <-stopCh:
return nil
default:
err := backoff.Retry(operation, operationBackoff)
if err != nil {
log.Printf("error running: %v", err)
return err
}
}
}
return nil
}
// Restore restores the latest version of an ad from the database.
// It returns the version number of the restored ad and any error encountered.
// The error could be ErrRecordNotFound if no ad is found or a DB connection error.
func (a *AdService) Restore() (err error) {
txn := a.db.Begin(&sql.TxOptions{Isolation: sql.LevelRepeatableRead})
err = txn.Raw("SELECT COALESCE(MAX(version), 0) FROM ads").Scan(&a.Version).Error
if err != nil {
return err
}
var ads []*model.Ad
err = txn.Where("is_active = ?", true).Find(&ads).Error
if err != nil {
return err
}
err = txn.Commit().Error
if err != nil {
return err
}
requestID := uuid.New().String()
a.dispatcher.ResponseChan.Store(requestID, make(chan interface{}, 1))
defer a.dispatcher.ResponseChan.Delete(requestID)
a.dispatcher.RequestChan <- &dispatcher.CreateBatchAdRequest{
Request: dispatcher.Request{RequestID: requestID},
Ads: ads,
}
select {
case resp := <-a.dispatcher.ResponseChan.Load(requestID):
if resp, ok := resp.(*dispatcher.CreateAdResponse); ok {
if resp.Err == nil {
log.Printf("Restored version: %d successfully\n", a.Version)
}
return resp.Err
}
case <-time.After(10 * time.Second):
return ErrTimeout
}
return ErrUnknown
}
// Subscribe implements model.AdService.
func (a *AdService) Subscribe() error {
log.Printf("subscribing to redis stream with offset: %d", a.Version)
ctx := context.Background()
lastID := fmt.Sprintf("0-%d", a.Version) // Assuming offset can be mapped directly to Redis Stream IDs
stopCh := make(chan struct{}, 1)
a.wg.Add(1)
defer a.wg.Done()
a.registerOnShutdown(func() {
close(stopCh)
})
for a.shutdown.Load() == false {
select {
case <-stopCh:
return nil
default:
// Reading from the stream
xReadArgs := &redis.XReadArgs{
Streams: []string{a.adStream, lastID},
Block: 3 * time.Second,
Count: 10,
}
msgs, err := a.redis.XRead(ctx, xReadArgs).Result()
if err != nil {
// log.Printf("error reading from redis: %v", err)
continue
}
for _, msg := range msgs {
for _, m := range msg.Messages {
log.Printf("received message: %v\n", m)
streamVersion, _ := strconv.ParseInt(strings.Split(m.ID, "-")[1], 10, 64)
if a.Version < int(streamVersion) {
a.Version = int(streamVersion)
lastID = m.ID
} else {
// our version is the same or higher than the stream version
continue
}
switch m.Values["type"].(string) {
case "create":
payload := &dispatcher.CreateAdRequest{}
json.Unmarshal([]byte(m.Values["ad"].(string)), payload)
a.dispatcher.RequestChan <- payload
case "delete":
payload := &dispatcher.DeleteAdRequest{}
json.Unmarshal([]byte(m.Values["ad"].(string)), payload)
a.dispatcher.RequestChan <- payload
default:
log.Printf("unknown message type: %s", m.Values["type"].(string))
}
}
}
}
}
return nil
}
func (a *AdService) registerOnShutdown(f func()) {
a.mu.Lock()
a.onShutdown = append(a.onShutdown, f)
a.mu.Unlock()
}
func (a *AdService) onShutdownNum() int {
a.mu.Lock()
defer a.mu.Unlock()
return len(a.onShutdown)
}
// storeAndPublishWithLock
//
// 1. locks the lockKey
//
// 2. stores the ad in the database, and set the version of the new ad to `SELECT MAX(version) FROM ad“ + 1
//
// 3. publishes the ad into redis stream. (ensure the message sequence number is the same as the ad's version)
//
// 4. releases the lock
func (a *AdService) storeAndPublishWithLock(ctx context.Context, ad *model.Ad, requestID string) (err error) {
ctx = context.Background()
lock, err := a.locker.Obtain(ctx, a.lockKey, 100*time.Millisecond, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.ExponentialBackoff(1*time.Millisecond, 5*time.Millisecond), 10),
})
if err != nil {
log.Printf("error obtaining lock: %v", err)
return
}
defer func() {
err := lock.Release(ctx)
if err != nil {
log.Printf("error releasing lock: %v", err)
}
}()
txn := a.db.Begin()
if err = txn.Error; err != nil {
return
}
var maxVersion int
if err = txn.Raw("SELECT COALESCE(MAX(version), 0) FROM ads").Scan(&maxVersion).Error; err != nil {
txn.Rollback()
return
}
ad.Version = maxVersion + 1
if err = txn.Create(ad).Error; err != nil {
txn.Rollback()
return
}
err = txn.Commit().Error
if err != nil {
return
}
adReq := &dispatcher.CreateAdRequest{
Request: dispatcher.Request{RequestID: requestID},
Ad: ad,
}
// adReqMap, err := adReq.ToMap()
adReqMapStr, err := json.Marshal(adReq)
// return requestID, nil
// log.Printf("adReqJsonStr: %s", adReqJsonStr)
if err != nil {
log.Printf("error marshalling ad request: %v", err)
return
}
_, err = a.redis.XAdd(ctx, &redis.XAddArgs{
Stream: a.adStream,
NoMkStream: false,
Approx: false,
MaxLen: 100000,
Values: []interface{}{
"ad", string(adReqMapStr),
"type", "create",
},
ID: fmt.Sprintf("0-%d", ad.Version),
}).Result()
if err != nil {
log.Printf("error publishing to redis: %v", err)
return
}
return nil
}
// CreateAd implements model.AdService.
func (a *AdService) CreateAd(ctx context.Context, ad *model.Ad) (adID string, err error) {
a.wg.Add(1)
defer a.wg.Done()
requestID := uuid.New().String()
a.dispatcher.ResponseChan.Store(requestID, make(chan interface{}, 1))
defer a.dispatcher.ResponseChan.Delete(requestID)
err = a.storeAndPublishWithLock(ctx, ad, requestID)
if err != nil {
return "", err
}
err = a.registerAdDeleteTask(ad)
if err != nil {
return "", err
}
select {
case resp := <-a.dispatcher.ResponseChan.Load(requestID):
if resp, ok := resp.(*dispatcher.CreateAdResponse); ok {
return resp.AdID, resp.Err
}
case <-time.After(3 * time.Second):
return "", ErrTimeout
}
return "", ErrUnknown
}
func (a *AdService) registerAdDeleteTask(ad *model.Ad) error {
payload := &model.AsynqDeletePayload{AdID: ad.ID.String()}
task, err := payload.ToTask()
if err != nil {
return err
}
taskID := fmt.Sprintf("%s-%s", payload.TypeName(), ad.ID.String())
processTime := ad.EndAt.T()
_, err = a.asynqClient.Enqueue(
task,
asynq.ProcessAt(processTime),
asynq.TaskID(taskID),
)
return err
}
// GetAds implements model.AdService.
func (a *AdService) GetAds(ctx context.Context, req *model.GetAdRequest) ([]*model.Ad, int, error) {
a.wg.Add(1)
defer a.wg.Done()
requestID := uuid.New().String()
a.dispatcher.ResponseChan.Store(requestID, make(chan interface{}, 1))
defer a.dispatcher.ResponseChan.Delete(requestID)
a.dispatcher.RequestChan <- &dispatcher.GetAdRequest{
Request: dispatcher.Request{RequestID: requestID},
GetAdRequest: req,
}
select {
case resp := <-a.dispatcher.ResponseChan.Load(requestID):
if resp, ok := resp.(*dispatcher.GetAdResponse); ok {
return resp.Ads, resp.Total, resp.Err
}
case <-time.After(3 * time.Second):
return nil, 0, ErrTimeout
}
return nil, 0, ErrUnknown
}
func NewAdService(dispatcher *dispatcher.Dispatcher, db *gorm.DB, redis *redis.Client, locker *redislock.Client, asynqClient *asynq.Client) model.AdService {
return &AdService{
dispatcher: dispatcher,
db: db,
redis: redis,
locker: locker,
lockKey: "lock:ad",
onShutdown: make([]func(), 0),
adStream: "ad",
asynqClient: asynqClient,
shutdown: atomic.Bool{},
Version: 0,
}
}
package service
import (
"context"
"dcard-backend-2024/pkg/model"
"encoding/json"
"github.com/hibiken/asynq"
)
type TaskService struct {
adService model.AdService
}
// HandleDeleteAd implements model.TaskService.
func (svc *TaskService) HandleDeleteAd(ctx context.Context, t *asynq.Task) error {
var deletePayload model.AsynqDeletePayload
if err := json.Unmarshal(t.Payload(), &deletePayload); err != nil {
return err
}
return svc.adService.DeleteAd(ctx, deletePayload.AdID)
}
func (svc *TaskService) RegisterTaskHandler(mux *asynq.ServeMux) {
mux.HandleFunc(model.AsynqDeletePayload{}.TypeName(), svc.HandleDeleteAd)
}
func NewTaskService(adService model.AdService) model.TaskService {
return &TaskService{
adService: adService,
}
}
package syncmap
import "sync"
type Map struct {
syncMap sync.Map
}
func (m *Map) LoadOrStore(key string, value chan interface{}) (chan interface{}, bool) {
val, loaded := m.syncMap.LoadOrStore(key, value)
return val.(chan interface{}), loaded
}
func (m *Map) Load(key string) chan interface{} {
val, ok := m.syncMap.Load(key)
if ok {
return val.(chan interface{})
} else {
return nil
}
}
func (m *Map) Exists(key string) bool {
_, ok := m.syncMap.Load(key)
return ok
}
func (m *Map) Store(key string, value chan interface{}) {
m.syncMap.Store(key, value)
}
func (m *Map) Delete(key string) {
m.syncMap.Delete(key)
}