Go GraphQl Redis PubSub でリアルタイムチャット (gqlgen websocket)

今回は色々なWebサービスで見かけるチャットアプリケーションのバックエンドサーバーを構築していきたいと思います。 様々な技術を使用しますが、主にGoとgo-redisを用いた実装に着目します。

Redis PubSub

RedisのPub/SubはRedisの主要機能の一つです。 あるチャネルに対してSub(サブスクライブ)しているとそのチャネルに Pub(パブリッシュ)したメッセージを受け取ることができます。 この概念はNoSQLの不慣れなデータ構造などを覚える必要はありません。

なぜPub/Subを使うのか?
なぜRDBではなくPub/Sub機能を使うのか? 確かにRDBでも データを保存しそのデータを取得すればチャット機能は実現できます。 しかしその場合 保存と取得という2回分のSQLが走る上に、それを通知するタイミングも決める必要があります。Pub/Subを使えば、例えばRedisではとても単純な操作でできてしましいます。

以前にまとめた記事を書いたのでより詳しく知りたい方はぜひこちらをご覧ください。

Redis PubSub についてまとめてみた。 - matsu tech blog

WebSocket

webSocketはHTTP通信とは別のプロトコルで双方向通信を行う仕組みです。 Pub/Subと組み合わせることで複数サーバー間でのスケールを可能にします。

今さら聞けないWebSocket~WebSocketとは~ - Qiita

RedisのPub/Subで異なるコンテナ間のWebSocketを同期する-どらごんテック

それではなぜスケールを前提にしていない今回のチャットアプリケーションにwebSocketが必要なのか?

それはGraphQLのライブラリで使用する gqlgen の Subscription(このSubscriptionはRedisのPub/Subとは別です。)はwebSocket通信を介して行われるからです。

gqlgen

gqlgenは GoのでGraphQLを実装する際によく使用されるライブラリです。 スキーマ駆動開発で型を自動生成してくれます。 Building GraphQL servers in golang — gqlgen

ただ、上記のドキュメントにはSubscriptionについて触れられておりません。

別途検索したところ該当するISSUEや記事を見つけました。 Can't find docs/examples of Subscriptions · Issue #953 · 99designs/gqlgen · GitHub

How to handle GraphQL subscriptions with Go, GQLgen and MongoDB

GQLgen provides us with some built-in handlers like Playground and WebsocketUpgrader which essentially creates a UI for testing our GraphQL server and for having a WebSocket connection with the clients.

つまり、gqlgenにはSubscriptionのためのwebsocket機能が提供されているのでそれを使おうということですね。

以上が今回使う技術ですが、一つ一つを深く知る必要はありません。 焦点はGoでPub/Subを実装するにはどうするかです。 実際コーディングする部分はgo-redisというクライアントライブラリをメインに使っていく事になります。

本題

前置きが長くなりましたが、本題です。 スキーマ駆動開発で

  • メッセージを通知するSubscription

  • メッセージをPub(パブリッシュ)するためのMutation

を実装していきたいと思います。

コードを見たい方はこちらからご覧頂けます。

github.com

それでは実装していきます。

環境構築

まずはwebSocket通信を確保するためにhandlerにwebsocketを追加します。 webフレームワークにechoを使用しています。

handler.go

package server

import (
    "net/http"
    "time"

    "github.com/99designs/gqlgen/graphql/playground"
    "github.com/99designs/gqlgen/handler"
    "github.com/backend/graph/generated"
    "github.com/gorilla/websocket"
    "github.com/labstack/echo"
)

func GraphqlHandler(resolver generated.ResolverRoot, directive generated.DirectiveRoot) echo.HandlerFunc {
    c := generated.Config{
        Resolvers:  resolver,
        Directives: directive,
    }

    h := handler.GraphQL(
        generated.NewExecutableSchema(c),
        handler.WebsocketKeepAliveDuration(10*time.Second),
        handler.WebsocketUpgrader(websocket.Upgrader{
            CheckOrigin: func(r *http.Request) bool {
                return true
            },
        }),
    )

    // NOTE: handler.Newが推奨だが playgroundのdocsが読み込まれない?
    // h := handler.New(
    // h.AddTransport(transport.POST{}) // https://zenn.dev/konboi/articles/ee8ec5c27b98576de3db

    return func(c echo.Context) error {
        h.ServeHTTP(c.Response(), c.Request())
        return nil
    }
}

以下に注目して下さい。

handler.WebsocketKeepAliveDuration(10*time.Second),
handler.WebsocketUpgrader(websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}),

WebsocketKeepAliveDuration()で通信を永続化しています。これが無いと数十秒で通信が切断されてしまいます。

WebsocketUpgrader()でwebsocket通信の構築を行なっています。

次にRedisを構築します。 以下コードです。 ライブラリに "github.com/go-redis/redis/v8" を使用します。

redis.go

package redis

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"

    "github.com/go-redis/redis/v8"
)

func New() *Client {
    dbConf, _ := strconv.Atoi(os.Getenv("REDIS_DB"))

    client := redis.NewClient(&redis.Options{
        Addr:     os.Getenv("REDIS_URL"),
        Password: os.Getenv("REDIS_PASSWORD"),
        DB:       dbConf,
    })

    fmt.Println(os.Getenv("REDIS_URL"), os.Getenv("REDIS_PASSWORD"), os.Getenv("REDIS_DB"))
    context := context.Background()

    err := client.Ping(context).Err()

    if err != nil {
        log.Fatal("failed to connect redis", err)
    }

    log.Print("success to connect redis!")

    return client
}

さてwebSocketとRedisの準備ができました。

次はResolver(APIの実装部分)の中でRedisを使いたいので構造体を作ります。

message.go

type MessageSubscriber struct {
    client       *redis.Client
    msgChannels  map[string]chan *gmodel.Message
    mutex        sync.Mutex
}

func NewMessageSubscriber(ctx context.Context, client *redis.Client) *MessageSubscriber {
    subscriber := &MessageSubscriber{
        client:       client,
        msgChannels:  map[string]chan *gmodel.Message{},
        mutex:        sync.Mutex{},
    }
    subscriber.startSubscribingRedis(ctx)
    return subscriber
}

redisの機能を使いたい時はこの構造体をResolverに持たせて呼び出すことになります。

また、startSubscribingRedis()では以下のように goroutineを生成して "room" チャネルをサブスクライブしています。

func (m *MessageSubscriber) startSubscribingRedis(ctx context.Context) error {
    var err error
    go func() {
        pubsub := m.client.Subscribe(ctx, "room")
        defer pubsub.Close()

        for {
            msgi, err := pubsub.Receive(ctx)
            if err != nil {
                continue
            }
            switch msg := msgi.(type) {
            case *redis.Message:
                var ms gmodel.Message
                if err := json.Unmarshal([]byte(msg.Payload), &ms); err != nil {
                    continue
                }

                m.mutex.Lock()
                for _, ch := range m.msgChannels {
                    ch <- &ms
                }
                m.mutex.Unlock()
            default:
            }
        }
    }()

    if err != nil {
        return err
    }

    return nil
}

後ほどgqlgenでSubscriptionのResolverを見ていきますが、そのResolverでは Returnする型のchannelを返すようなコードが生成されます。 返したchannelによって上記でsubscribeしている値をよしなに判別してくれるので大変便利です。

main

これまで実装してきたモノをmain関数で呼び出し実行します。 他にもmysqlやmiddleware等が登場していますが、説明は割愛させて頂きます。

func main() {
    db, err := rdb.InitDB()
    if err != nil {
        panic(err.Error())
    }
    redis := redis.New()

    subscribers := resolver.Subscribers{
        Message: subscriber.NewMessageSubscriber(context.Background(), redis),
    }
    loader := middleware.NewDataloader(db)

    middlewares := []echo.MiddlewareFunc{
        middleware.Authorize(),
        middleware.NewCors(),
        loader.InjectStoreStatusLoader(),
    }

    resolver := resolver.New(db, subscribers)
    directive := directive.New(db)
    graphqlHandler := server.GraphqlHandler(resolver, directive)
    router := server.NewRouter(graphqlHandler, middlewares)
    server.Run(router)

}

以上で環境構築は終了です。 次はAPIの実装部分であるResolverを生成していきます。

Schema

スキーマを定義します。

schema.graphql

schema {
  # query: Query
  mutation: Mutation
  subscription: Subscription
}


type Mutation {
  joinUser(input: JoinUserInput!): User!
  postMessage(input: PostMessageInput!): Message!
}

type Subscription {
  messagePosted(userID: ID): Message!
}

処理の流れは以下です。

  • クライアントA messagePosted()でSubscribeしてメッセージを待ち受ける。

  • クライアントB joinUser()でチャネルに参加する。

  • クライアントB postMessage()でチャネルにメッセージを送信する。

  • クライアントA messagePosted()でチャネル内のメッセージを受信する。

今回はコメントアウトしているqueryは使用しないので割愛します。 その他にもGitHub上では上記に無いAPIがあったり、RDBでUserを永続化していたりしますが今回は触れません。


型を定義できたので gqlgen のコマンドでresolver等を自動生成します。

func (r *mutationResolver) JoinUser(ctx context.Context, input gmodel.JoinUserInput) (*gmodel.User, error) {
    panic(fmt.Errorf("not implemented"))
}

func (r *subscriptionResolver) MessagePosted(ctx context.Context, userID *string) (<-chan *gmodel.Message, error) {
    panic(fmt.Errorf("not implemented"))
}

func (r *mutationResolver) PostMessage(ctx context.Context, input gmodel.PostMessageInput) (*gmodel.Message, error) {
    panic(fmt.Errorf("not implemented"))
}

実装していくResolverが生成されました。 あとはこのResolverを埋めていくだけです。


user.resolver.go joinUser()

joinUserではrdbからuser検索の処理を行なっています。 setNxメソッドを別を呼び出しメッセージを "room" チャネルにPublishします。

func (r *mutationResolver) JoinUser(ctx context.Context, input gmodel.JoinUserInput) (*gmodel.User, error) {
    var user gmodel.User
    if err := r.db.First(&user, input.UserID).Error; err != nil {
        return nil, err
    }

    if err := r.subscribers.Message.SetNx(ctx, user.ID); err != nil {
        return nil, err
    }

    return &user, nil
}
func (m *MessageSubscriber) SetNx(ctx context.Context, userID string) error {
    val, err := m.client.SetNX(ctx, userID, userID, 60*time.Minute).Result()
    if err != nil {
        log.Println(err)
        return err
    }
    if !val {
        return errors.New("this user name has already used")
    }

    return nil
}

setNxは Redisの SET コマンドにNXオプションを付与したメソッドです。

SET – Redis 日本語訳

ここではKeyValue のSETが登場していますが、Pub/Subは発生していません。 NXオプションでuserがチャットに既に参加しているかどうかを判定しKeyが存在しなければKeyValueStoreに登録しています。


message.resolver.go PostMessage()

func (r *mutationResolver) PostMessage(ctx context.Context, input gmodel.PostMessageInput) (*gmodel.Message, error) {
    msgSubscriber := r.subscribers.Message
    isJoined, err := msgSubscriber.CheckJoined(ctx, input.UserID)
    if err != nil || !isJoined {
        return nil, err
    }

    isSet, err := msgSubscriber.SetExpire(ctx, input.UserID)
    if err != nil || !isSet {
        return nil, err
    }

    var user gmodel.User
    if err := r.db.First(&user, input.UserID).Error; err != nil {
        return nil, err
    }

    m := &gmodel.Message{
        ID:      user.ID,
        User:    &user,
        Message: input.Message,
    }

    if err := msgSubscriber.PublishMsg(ctx, m); err != nil {
        return nil, err
    }

    return m, nil
}
func (m *MessageSubscriber) PublishMsg(ctx context.Context, msg *gmodel.Message) error {
    mb, err := json.Marshal(msg)
    if err != nil {
        return err
    }
    m.client.Publish(ctx, "room", mb)
    return nil
}

PostMessageではjson形式でメッセージとuser情報を整形して "room" チャネルにPublishしています。


subscription.resolver.go MessagePoseted()

func (r *subscriptionResolver) MessagePosted(ctx context.Context, userID *string) (<-chan *gmodel.Message, error) {
    msgSubscriber := r.subscribers.Message
    isJoined, err := msgSubscriber.CheckJoined(ctx, *userID)
    if err != nil || !isJoined {
        return nil, err
    }

    msgChan := msgSubscriber.MakeChan(ctx, userID)
    return msgChan, nil
}
func (m *MessageSubscriber) MakeChan(ctx context.Context, userID *string) <-chan *gmodel.Message {
    messageChan := make(chan *gmodel.Message, 1)
    m.mutex.Lock()
    m.msgChannels[*userID] = messageChan
    m.mutex.Unlock()

    go func() {
        <-ctx.Done()
        m.mutex.Lock()
        delete(m.msgChannels, *userID)
        m.mutex.Unlock()
        m.client.Del(ctx, *userID)
    }()

    return messageChan
}

チャネルを生成しています。 またここでもgoroutineを作成し、接続が切れた時にはチャネルとRedisのKeyを削除しています。

以上でPub/Subの実装ができました。

PlayGround

では実装したサーバーを起動して見ます。

無事にPub/Subでメッセージの送受信を行うことができました。 実装は以上となります。


最後に、 このブログではweb開発について発信していくのでまたご覧頂けると嬉しいです。 最後までお読み頂きありがとうございました。