GraphQL Subscriptionを Redis KeySpaceで実装する ~その2~

今回は 前回に続き GraphQLのSubscriptionをRedisのkeyspace notificationを使って実装していきたいと思います。

前回はkeyspaceの概要編でしたが今回は実践編です。 以下の技術を使用します。

- Go
- gqlgen v0.13.0
- go-redis v8

今回実装する機能

今回は Redis keyspaceを使った Userの状態管理機能 を実装していきます。

機能の詳細

  • userはONLINEかOFFLINEの状態を持つ
  • userはONLINE状態を更新できる(ONLINE状態にすることができる)
  • ONLINE状態の時に一定の期間何も操作しなければOFFLINEになる
  • userの状態が変更する度にその状態を配信、通知する事ができる

以上が機能の詳細となります。

Redisのデータ設計

実装に入る前にRedisのデータ形式を設計しておく必要があります。

筆者は以下のように設計しました。

KEY {userID}
VALUE ONLINE

KEY VALUE方式で userIDに対して 状態を ONLINE にします。

OFFLINE状態の時はRedisのデータが無い状態(nil)なので今回はONLINEのみとなっいます。

処理の流れをredis-cliで見ていきます。

userID が 1のuserを ONLINE にする場合

PSUBSCRIBE __keyspace@0__:*
SET 1 ONLINE EX 6

keyspaceでは以下のように通知されます。

"__keyspace@0__:1"
"set"

"__keyspace@0__:1"
"expired"

このような状態管理をアプリケーションサーバーで実装していくのが今回の内容です。

スキーマ定義

gqlgenはスキーマ駆動開発のライブラリです。 まずはスキーマを定義していきましょう。

定義するのは以下のMutationとSubscriptionです。

Mutation
updateUserStatus(input: updateUserStatusInput!): UserStatus!

Userの状態をONLINEにします。

Subscription  
userStatusChanged(userId: String!): UserStatus!

Userの状態の変更を通知します。

それでは実際にコードを書いていきます。

schema.graphql

type Mutation {
  updateUserStatus(input: updateUserStatusInput!): UserStatus!
}

type Subscription {
  userStatusChanged(userId: String!): UserStatus!
}

さらにUserStatus タイプを定義します。

userStatus.graphql

type UserStatus {
  userId: ID!
  status: Status!
}

enum Status {
  ONLINE
  OFFLINE
}

input updateUserStatusInput {
  userID: ID!
}

今回はONLINEとOFFLINEのみですがさらに複雑な状態を管理することもできそうです。

以上でスキーマ定義が完了しました。 次にコマンドラインからResolverを生成します。

$ gqlgen
func (r *mutationResolver) UpdateUserStatus(ctx context.Context, input gmodel.UpdateUserStatusInput) (*gmodel.UserStatus, error) {
  panic("not implemented")
}

func (r *subscriptionResolver) UserStatusChanged(ctx context.Context, userID string) (<-chan *gmodel.UserStatus, error) {
  panic("not implemented")
}

無事にResolverを生成することができました。 次にResolverの中身を実装していきたいところですが、その前にRedisのkeyspaceの実装を行なっていきます。

UserStatusSubscriberの実装

この節ではアプリケーションが開始された際に実行されるRedis keyspaceを実装していきます。

まずはUserStatusSubscriberという構造体を作成します。

userStatus.go

type UserStatusSubscriber struct {
    Client             *redis.Client
    UserStatusChannels map[string]chan *gmodel.UserStatus
    Mutex              sync.Mutex
}

func NewUserStatusSubscriber(ctx context.Context, client *redis.Client) *UserStatusSubscriber {
    subscriber := &UserStatusSubscriber{
        Client:             client,
        UserStatusChannels: map[string]chan *gmodel.UserStatus{},
        Mutex:              sync.Mutex{},
    }
    subscriber.startSubscribingRedis(ctx)
    return subscriber
}

この構造体をmain.goで作成し、resolverの構造体に注入します。

main.go

    subscribers := resolver.Subscribers{
        UserStatus: subscriber.NewUserStatusSubscriber(context.Background(), redis),
    }

    resolver := resolver.New(subscribers)

NewUserStatusSubscriber() 関数の subscriber.startSubscribingRedis(ctx) に着目して下さい。 アプリケーションの開始時にmain.goからこの関数が呼び出されます。 この関数は次の説明で実装していきます。 また、Resolverの構造体にSubscriberを持たせることで各Resolverメソッドの中でSubscriberを使用できるようになりました。

次に startSubscribingRedis() を実装していきます。

startSubscribingRedis()

このメソッドではgoroutine とkeyspaceの機能を使用してRedisのデータの変更を待ち受ける処理を作成していきます。

func (m *UserStatusSubscriber) startSubscribingRedis(ctx context.Context) error {
    go func() {
        pubsub := m.Client.PSubscribe(ctx, "__keyspace@0__:*")
        defer pubsub.Close()

        ch := pubsub.Channel()

        for {
            select {
            case <-ctx.Done():
            case msg := <-ch:
                switch msg.Payload {
                case "set":
                    prefix := "__keyspace@0__:"
                    userID := strings.TrimPrefix(msg.Channel, prefix)
                    status, err := m.Client.Get(ctx, userID).Result()
                    if err != nil {
                        fmt.Println("Redis Error GET:", err)
                        continue
                    }

                    userStatus := &gmodel.UserStatus{
                        UserID: userID,
                    }
                    switch status {
                    case "ONLINE":
                        userStatus.Status = gmodel.StatusOnline
                    }

                    m.Mutex.Lock()
                    for _, ch := range m.UserStatusChannels {
                        ch <- userStatus
                    }
                    m.Mutex.Unlock()
                case "expired":
                    prefix := "__keyspace@0__:"
                    userID := strings.TrimPrefix(msg.Channel, prefix)
                    userStatus := &gmodel.UserStatus{
                        UserID: userID,
                        Status: gmodel.StatusOffline,
                    }

                    m.Mutex.Lock()
                    for _, ch := range m.UserStatusChannels {
                        ch <- userStatus
                    }
                    m.Mutex.Unlock()
                // case "expire":
                // case "del":
                default:
                }
            }
        }
    }()

順番に解説していきます。*冗長なので適宜リファクタリングして下さい。

pubsub := m.Client.PSubscribe(ctx, "__keyspace@0__:*")
defer pubsub.Close()
ch := pubsub.Channel()

m.Clientには go-redisのClientを持たせています。 PSubscribeメソッドで __keyspace@0__:* を待ち受けることで 全てのKEYに対して何のイベント(SET,DEL等)が発生したか通知を受けることができます。

これはredis-cliの以下のコマンドと等価です。

PSUBSCRIBE __keyspace@0__:*

次に無限ループとselect caseで Subscribeしているチャネルに対して分岐を行います。

for {
  select {
  case <-ctx.Done():
  case msg := <-ch:

<-ctx.Done() はチャネルが終了した時を示します。 今回の実装ではアプリケーションが起動している時にこのチャネルが終了することは無いという前提なので特に処理を書いていません。

msg := <-ch: ではチャネルにデータが配信された事を示しています。 そこからさらに 検知したデータの種類によって処理を分岐していきます。

今回はRedis のイベントの中でも SETとEXPIREDが発生した時の処理を実装します。 条件分岐は以下のように行います。

switch msg.Payload {
case "set":
//...
case "expired":
//...

次にそれぞれの処理です。

イベントSETの場合

case "set":
    prefix := "__keyspace@0__:"
    userID := strings.TrimPrefix(msg.Channel, prefix)
    status, err := m.Client.Get(ctx, userID).Result()
    if err != nil {
        fmt.Println("Redis Error GET:", err)
        continue
    }

    userStatus := &gmodel.UserStatus{
        UserID: userID,
    }
    switch status {
        case "ONLINE":
            userStatus.Status = gmodel.StatusOnline
    }

    m.Mutex.Lock()
    for _, ch := range m.UserStatusChannels {
        ch <- userStatus
    }
    m.Mutex.Unlock()

データがSETされた場合 msg.Channel (__keyspace@0__:{userID}) から userIDを抜き出し userIDがKEYとなっているRedisデータのValueを取得します。

今回の場合VALUEには ONLINEのみしか入ってきませんが例えば状態がより多くなった時この処理で見分けることになります。

次に取得したデータとuserIDを使用しuserStatusを生成します。

userStatusを UserStatusChannels に流し込む事で Subscription(gqlgen)しているクライアントに配信します。

イベントEXPIREDの場合

case "expired":
    prefix := "__keyspace@0__:"
    userID := strings.TrimPrefix(msg.Channel, prefix)
    userStatus := &gmodel.UserStatus{
          UserID: userID,
             Status: gmodel.StatusOffline,
    }

    m.Mutex.Lock()
    for _, ch := range m.UserStatusChannels {
        ch <- userStatus
    }
    m.Mutex.Unlock()

SET時にはttlを設定しデータの期限を定めています。 同一のKEYが上書きされなければSET時に設定した期限がきた時にデータはEXPIREDとなります。

今回の設計ではEXPIREDされた時はUserはOFFLINEのstatusになるので、EXPIREDしたチャネルからuserIDを取得しOFFLINEにして UserStatusChannelsに配信します。

以上でkeyspaceの実装が完了しました。 今回はSET,EXPIREDのみの処理ですが case "del": などさらに処理を追加する事も可能です。

Resolverの実装

あとはResolverを埋めていくだけです。

まずはMutationを実装しましょう。

func (r *mutationResolver) UpdateUserStatus(ctx context.Context, input gmodel.UpdateUserStatusInput) (*gmodel.UserStatus, error) {
    userStatusSubs := r.subscribers.UserStatus

    if err := userStatusSubs.Client.Set(
        ctx, input.UserID,
        string(gmodel.StatusOnline),
        time.Millisecond*time.Duration(6000),
    ).Err(); err != nil {
        return nil, err
    }

    return &gmodel.UserStatus{
        UserID: input.UserID,
        Status: gmodel.StatusOnline,
    }, nil
}

上記のように引数から userIDを KEY として ttlと共にデータをSETします。 redis-cliの以下のコマンドと等価になります。

SET {userID} {status} EX {ttl}
SET 1 ONLINE EX 6

シンプルですね。

この時にSETのイベントが発生しているので先ほど実装した goroutineの中で処理が実行されuserIDが同じchannelをSubscribeしているクライアントに userStatusが配信されます。

次にSubscriptionを実装していきます。

func (r *subscriptionResolver) UserStatusChanged(ctx context.Context, userID string) (<-chan *gmodel.UserStatus, error) {
    userStatusSubs := r.subscribers.UserStatus

    userStatusSubs.Mutex.Lock()
    channels, ok := userStatusSubs.UserStatusChannels[userID]
    if !ok {
        channels = make(chan *gmodel.UserStatus)
        userStatusSubs.UserStatusChannels[userID] = channels
    }
    userStatusSubs.Mutex.Unlock()

    go func() {
        <-ctx.Done()
        userStatusSubs.Mutex.Lock()
        delete(userStatusSubs.UserStatusChannels, userID)
        userStatusSubs.Mutex.Unlock()
    }()

    return channels, nil
}

引数のuserIDを元にUserStatusChannels内に同等の keyがあるかを判定し、なければ作成し購読するchannelを返します。

goroutineの中では このチャネルが終了した時にUserStatusChannelsの中からmapを削除するようにしています。

これで全ての実装が完了しました。

PlayGroundで実行してみましょう!

PlayGround

二つのブラウザを開きそれぞれMutationとSubscriptionを実行します。

無事にuserStatusの変更を通知する事ができました! また、Mutationの実行後に何もしなければEXPIREDとなり自動的にOFFLINEが通知されました。

内容は以上となります。

最後に、 このブログではweb開発について発信していくのでまたご覧頂けると嬉しいです。 最後までお読み頂きありがとうございました。