GraphQL Subscriptionを Redis KeySpaceで実装する ~その1~

今回はRedisのkeyspaceを使用してGraphQLのSubscriptionを実装したいと思います。以前にRedisのPub/Subを使って実装した記事を書きました。

Go GraphQl Redis PubSub でリアルタイムチャット (gqlgen websocket) - matsu tech blog

今回はGraphQLのSubscriptionの部分は同じですが、Redis上で通知を行う仕組みをPub/Subからkeyspaceという技術に変更して行いたいと思います。 これによりPub/Subでは単に messageという形式で単純な通知しか受けることしか出来ませんでしたが、keyspaceではRedisの特定のイベント(SETやDEL)に対しても通知を受けることができるようになります。

keyspaceの概要とGraphQLの実装2回に分けてお送りしていきます。今回はkeyspaceの概要編です。

keyspace notifications

keyspace notificationsとは

今までkeyspaceと申し上げてきましたが正確には keyspace notifications と言います。

キー・スペース通知 — Redis Documentation (Japanese Translation)

keyspace notifications は 2.8.0 以上で提供されます。 公式の説明そのままですが、以下の機能を提供します。

keyspace notifications は、Redis データセットに対するなんらかの変更イベントを、クライアントが Pub/Sub チャネルで受け取るための仕組みです。

以下は、受け取れるイベントの例です:

  • あるキーに作用するすべてのコマンド

  • LPUSH 操作を受けたすべてのキー

  • database 0 の中で、expire されたすべてのキー

keyspaceは二つのタイプの通知を受け取ることができます。

 __keyspace@0__:key event
 __keyevent@0__:event key
  • 前者はSubscribeしているkeyに対して行われる全てのイベント(SETやDEL)
  • 後者ではSubscribeしているイベントに対してどのようなKEYを対象にしているか

keyspaceを有効にするには設定を変える必要があります。

この機能は少なからず CPU パワーを使用するため、デフォルトではは無効になっています。通知は redis.conf の ‘notify-keyspace-events’ か CONFIG SET を通して有効にすることができます。

以上がkeyspace notification の説明となります。 説明だけでは理解し辛いのでCLIを叩いて確認してみましょう

redis-cli でkeyspaceを試す

筆者はDocker環境を使います。

FROM redis:latest
COPY ./redis.conf /usr/local/etc/redis/redis.conf
CMD ["redis-server", "/usr/local/etc/redis/redis.conf"]

redis.confファイルに以下を追加しましょう

notify-keyspace-events KEA

それではDockerを起動します。

コンテナにアタッチして二つのターミナルを開いて下さい。

まずは片方のターミナルで __keyspace@0__:hoge を Subscribeします。

subscribe __keyspace@0__:hoge

もう一方のターミナルで SET コマンドを実行します。

set hoge huga

Subscribeしているチャネルで SET の通知を受け取ることができました。


次は __keyevent@0__:set で SETコマンド実行時にどのKEYに対して行われているかを受け取ってみましょう。

subscribe __keyevent@0__:set

setは小文字でないと反応しないので注意して下さい。

次は反対のターミナルで再度SETコマンドを実行します。

set hoge huga

無事 SETコマンドが KEY hoge に対して行われていることを確認することができました。

ここまでkeyspaceの動きを実際にcliを叩いて見てきましたが、最後に応用として PSUBSCRIBE というパターンマッチで通知を受け取る方法を見てみましょう。

psubscribe __keyevent@0__:*

上記のコマンドで SET,DELなど全てのイベントに対してどのKEYが指定されているのか通知を受けることができます。

さらに

psubscribe __keyevent@0__:*

上記のコマンドではすべてのKEYに対してなんのイベントが発生しているのか通知を受け取ることができます。

以上がkeyspaceの説明となります。

次回は実装編です。

最後に、 このブログではweb開発について発信していくのでまたご覧頂けると嬉しいです。 最後までお読み頂きありがとうございました。

追記

実装編はこちら

shikatech.hatenablog.com

Go GraphQL AWS S3への画像アップロード

今回はGraphQLで画像のアップロードを行なっていきたいと思います。 アップロード先のストレージにはAWSのS3を使用します。 尚、この記事ではS3のバケットポリシーやAWS関連については深掘りしません、aws-sdk-go-v2 というGoのaws-sdkを使用しますがコード関連が中心となります。

使用技術

  • gqlgen v0.13.0
  • aws-sdk-go-v2

*このブログでは今までにも何度かgqlgenを使用した内容を紹介しており、 コードは同じレポジトリに随時追加しております。 お時間あればぜひご覧頂けると嬉しいです。

schema

まずはスキーマを定義します。 ファイルは適宜置き換えて下さい。

schema.graphql

type Mutation {
  uploadFile(input: UploadFileInput!): UploadFilePayload!
}

file.graphql

input UploadFileInput {
  file: Upload!
}

type UploadFilePayload {
  uploadedPath: String!
}

シンプルなMutationですが、 inputの Upload! 型に注目して下さい。 これは gqlgenの Scalar type と言って組み込みのカスタム型です。 Upload 型は以下のような構造体を呼び出します。

type Upload struct {
    File        io.Reader
    Filename    string
    Size        int64
    ContentType string
}

Scalar typeを使用するには 任意の *.graphql ファイル にコードを追加する必要があります。

schema.graphql

scalar Upload

これでスキーマ定義ができたので、gqlgen コマンドで型とResolverを生成します。

*筆者はファイルを分割する為に自動生成の機能をオフにしています。

Reolver

func (r *mutationResolver) UploadFile(ctx context.Context, input model.UploadFileInput) (*model.UploadFilePayload, error) {
    panic(fmt.Errorf("not implemented"))
}

models_gen.go

type UploadFileInput struct {
    File graphql.Upload `json:"file"`
}

type UploadFilePayload struct {
    UploadedPath string `json:"uploadedPath"`
}

UploadFileInput の構造体に注目して下さい。 graphql.Uploadを呼び出しています。

これが先ほど紹介した以下の構造体です。

type Upload struct {
    File        io.Reader
    Filename    string
    Size        int64
    ContentType string
}

アップロードするファイルは この構造体としてResolverに渡されます。

次はアップロード先のs3の準備を行います。

s3の準備

今回はアップロード先にs3を使用するので事前にバケットの作成と設定をして下さい。アップロード可能な状態に設定する必要があります。

次に必要なパッケージをダウンロードします。

以下のパッケージをそれぞれ go get して下さい。

 "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
    "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/aws/aws-sdk-go-v2/config"

The AWS SDK for Go requires Go 1.15 or later.

sdk-v2 は Goのversion 1.15から対応になります。

aws-sdk-go-v2 は比較的新しいので aws-sdk-go と混同しないようお気をつけ下さい。 自動インポートなどを使用すると v1 の方が呼び出されたりするので注意が必要です。

Getting Started with the AWS SDK for Go V2 | AWS SDK for Go V2

Resolverの実装

事前準備が完了したのであとは実装するのみです。 *今回はレイヤーやモジュール分割を行いません。

func (r *mutationResolver) UploadFile(ctx context.Context, input model.UploadFileInput) (*model.UploadFilePayload, error) {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        return nil, err
    }

    client := s3.NewFromConfig(cfg)
    uploader := manager.NewUploader(client)

    res, err := uploader.Upload(ctx, &s3.PutObjectInput{
        Bucket:      aws.String("bucketName"),
        Key:         aws.String("uploadPath/" + input.File.Filename),
        Body:        input.File.File,
        ContentType: aws.String(input.File.ContentType),
    })
    if err != nil {
        return nil, err
    }

    return &model.UploadFilePayload{
        UploadedPath: res.Location,
    }, nil
}

上から順に解説します。

こちらのコードはawsのconfigを読み込んでいます。 以下のように何もオプションを指定しないことで

   cfg, err := config.LoadDefaultConfig(context.TODO())

以下の順に credential を読みに行きます。

1.Environment variables. Static Credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN) Web Identity Token (AWS_WEB_IDENTITY_TOKEN_FILE)

2.Shared configuration files. SDK defaults to credentials file under .aws folder that is placed in the home folder on your computer. SDK defaults to config file under .aws folder that is placed in the home folder on your computer.

3.If your application uses an ECS task definition or RunTask API operation, IAM role for tasks.

4.if your application is running on an Amazon EC2 instance, IAM role for Amazon EC2.

Configuring the AWS SDK for Go V2 | AWS SDK for Go V2

本番環境と開発環境でcredentialを別の場所に配置しても自動的に読みに行く場所を変更してくれるので便利ですね。

    client := s3.NewFromConfig(cfg)
    uploader := manager.NewUploader(client)

    res, err := uploader.Upload(ctx, &s3.PutObjectInput{
        Bucket:      aws.String("bucketName"),
        Key:         aws.String("uploadPath/" + input.File.Filename),
        Body:        input.File.File,
        ContentType: aws.String(input.File.ContentType),
    })

あとは上記のように inputから s3に渡す構造体を整形し Uploadメソッドでアップロードします。 簡単ですね!

APIを実行

それでは実装したAPIを実行してみましょう。 恐らく現時点でgqlgenのplaygroundには ファイルのアップロード機能は搭載されていません。

その為今回は以下 firecamp というクライアントアプリを使いました。

https://firecamp.io/graphql

画像下部にあるようにファイルを指定して実行します。

無事にアップロードしたpathが返ってきました! filepathをDBに保存すればフロント側から呼び出し画像表示ができますね。

今回の内容は以上です。


最後に、 このブログではweb開発について発信していくのでまたご覧頂けると嬉しいです。 最後までお読み頂きありがとうございました。

Go GraphQl Redis PubSub でリアルタイムチャット (gqlgen websocket)

今回は色々なWebサービスで見かけるチャットアプリケーションのバックエンドサーバーを構築していきたいと思います。 様々な技術を使用しますが、主にGoとgo-redisを用いた実装に着目します。

Redis PubSub

RedisのPub/SubはRedisの主要機能の一つです。 あるチャネルに対してSub(サブスクライブ)しているとそのチャネルに Pub(パブリッシュ)したメッセージを受け取ることができます。 この概念はNoSQLの不慣れなデータ構造などを覚える必要はありません。

なぜPub/Subを使うのか?
なぜRDBではなくPub/Sub機能を使うのか? 確かにRDBでも データを保存しそのデータを取得すればチャット機能は実現できます。 しかしその場合 保存と取得という2回分のSQLが走る上に、それを通知するタイミングも決める必要があります。Pub/Subを使えば、例えばRedisではとても単純な操作でできてしましいます。

以前にまとめた記事を書いたのでより詳しく知りたい方はぜひこちらをご覧ください。

Redis PubSub についてまとめてみた。 - matsu tech blog

WebSocket

webSocketはHTTP通信とは別のプロトコルで双方向通信を行う仕組みです。 Pub/Subと組み合わせることで複数サーバー間でのスケールを可能にします。

今さら聞けないWebSocket~WebSocketとは~ - Qiita

RedisのPub/Subで異なるコンテナ間のWebSocketを同期する-どらごんテック

それではなぜスケールを前提にしていない今回のチャットアプリケーションにwebSocketが必要なのか?

それはGraphQLのライブラリで使用する gqlgen の Subscription(このSubscriptionはRedisのPub/Subとは別です。)はwebSocket通信を介して行われるからです。

gqlgen

gqlgenは GoのでGraphQLを実装する際によく使用されるライブラリです。 スキーマ駆動開発で型を自動生成してくれます。 Building GraphQL servers in golang — gqlgen

ただ、上記のドキュメントにはSubscriptionについて触れられておりません。

別途検索したところ該当するISSUEや記事を見つけました。 Can't find docs/examples of Subscriptions · Issue #953 · 99designs/gqlgen · GitHub

How to handle GraphQL subscriptions with Go, GQLgen and MongoDB

GQLgen provides us with some built-in handlers like Playground and WebsocketUpgrader which essentially creates a UI for testing our GraphQL server and for having a WebSocket connection with the clients.

つまり、gqlgenにはSubscriptionのためのwebsocket機能が提供されているのでそれを使おうということですね。

以上が今回使う技術ですが、一つ一つを深く知る必要はありません。 焦点はGoでPub/Subを実装するにはどうするかです。 実際コーディングする部分はgo-redisというクライアントライブラリをメインに使っていく事になります。

本題

前置きが長くなりましたが、本題です。 スキーマ駆動開発で

  • メッセージを通知するSubscription

  • メッセージをPub(パブリッシュ)するためのMutation

を実装していきたいと思います。

コードを見たい方はこちらからご覧頂けます。

github.com

それでは実装していきます。

環境構築

まずはwebSocket通信を確保するためにhandlerにwebsocketを追加します。 webフレームワークにechoを使用しています。

handler.go

package server

import (
    "net/http"
    "time"

    "github.com/99designs/gqlgen/graphql/playground"
    "github.com/99designs/gqlgen/handler"
    "github.com/backend/graph/generated"
    "github.com/gorilla/websocket"
    "github.com/labstack/echo"
)

func GraphqlHandler(resolver generated.ResolverRoot, directive generated.DirectiveRoot) echo.HandlerFunc {
    c := generated.Config{
        Resolvers:  resolver,
        Directives: directive,
    }

    h := handler.GraphQL(
        generated.NewExecutableSchema(c),
        handler.WebsocketKeepAliveDuration(10*time.Second),
        handler.WebsocketUpgrader(websocket.Upgrader{
            CheckOrigin: func(r *http.Request) bool {
                return true
            },
        }),
    )

    // NOTE: handler.Newが推奨だが playgroundのdocsが読み込まれない?
    // h := handler.New(
    // h.AddTransport(transport.POST{}) // https://zenn.dev/konboi/articles/ee8ec5c27b98576de3db

    return func(c echo.Context) error {
        h.ServeHTTP(c.Response(), c.Request())
        return nil
    }
}

以下に注目して下さい。

handler.WebsocketKeepAliveDuration(10*time.Second),
handler.WebsocketUpgrader(websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}),

WebsocketKeepAliveDuration()で通信を永続化しています。これが無いと数十秒で通信が切断されてしまいます。

WebsocketUpgrader()でwebsocket通信の構築を行なっています。

次にRedisを構築します。 以下コードです。 ライブラリに "github.com/go-redis/redis/v8" を使用します。

redis.go

package redis

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"

    "github.com/go-redis/redis/v8"
)

func New() *Client {
    dbConf, _ := strconv.Atoi(os.Getenv("REDIS_DB"))

    client := redis.NewClient(&redis.Options{
        Addr:     os.Getenv("REDIS_URL"),
        Password: os.Getenv("REDIS_PASSWORD"),
        DB:       dbConf,
    })

    fmt.Println(os.Getenv("REDIS_URL"), os.Getenv("REDIS_PASSWORD"), os.Getenv("REDIS_DB"))
    context := context.Background()

    err := client.Ping(context).Err()

    if err != nil {
        log.Fatal("failed to connect redis", err)
    }

    log.Print("success to connect redis!")

    return client
}

さてwebSocketとRedisの準備ができました。

次はResolver(APIの実装部分)の中でRedisを使いたいので構造体を作ります。

message.go

type MessageSubscriber struct {
    client       *redis.Client
    msgChannels  map[string]chan *gmodel.Message
    mutex        sync.Mutex
}

func NewMessageSubscriber(ctx context.Context, client *redis.Client) *MessageSubscriber {
    subscriber := &MessageSubscriber{
        client:       client,
        msgChannels:  map[string]chan *gmodel.Message{},
        mutex:        sync.Mutex{},
    }
    subscriber.startSubscribingRedis(ctx)
    return subscriber
}

redisの機能を使いたい時はこの構造体をResolverに持たせて呼び出すことになります。

また、startSubscribingRedis()では以下のように goroutineを生成して "room" チャネルをサブスクライブしています。

func (m *MessageSubscriber) startSubscribingRedis(ctx context.Context) error {
    var err error
    go func() {
        pubsub := m.client.Subscribe(ctx, "room")
        defer pubsub.Close()

        for {
            msgi, err := pubsub.Receive(ctx)
            if err != nil {
                continue
            }
            switch msg := msgi.(type) {
            case *redis.Message:
                var ms gmodel.Message
                if err := json.Unmarshal([]byte(msg.Payload), &ms); err != nil {
                    continue
                }

                m.mutex.Lock()
                for _, ch := range m.msgChannels {
                    ch <- &ms
                }
                m.mutex.Unlock()
            default:
            }
        }
    }()

    if err != nil {
        return err
    }

    return nil
}

後ほどgqlgenでSubscriptionのResolverを見ていきますが、そのResolverでは Returnする型のchannelを返すようなコードが生成されます。 返したchannelによって上記でsubscribeしている値をよしなに判別してくれるので大変便利です。

main

これまで実装してきたモノをmain関数で呼び出し実行します。 他にもmysqlやmiddleware等が登場していますが、説明は割愛させて頂きます。

func main() {
    db, err := rdb.InitDB()
    if err != nil {
        panic(err.Error())
    }
    redis := redis.New()

    subscribers := resolver.Subscribers{
        Message: subscriber.NewMessageSubscriber(context.Background(), redis),
    }
    loader := middleware.NewDataloader(db)

    middlewares := []echo.MiddlewareFunc{
        middleware.Authorize(),
        middleware.NewCors(),
        loader.InjectStoreStatusLoader(),
    }

    resolver := resolver.New(db, subscribers)
    directive := directive.New(db)
    graphqlHandler := server.GraphqlHandler(resolver, directive)
    router := server.NewRouter(graphqlHandler, middlewares)
    server.Run(router)

}

以上で環境構築は終了です。 次はAPIの実装部分であるResolverを生成していきます。

Schema

スキーマを定義します。

schema.graphql

schema {
  # query: Query
  mutation: Mutation
  subscription: Subscription
}


type Mutation {
  joinUser(input: JoinUserInput!): User!
  postMessage(input: PostMessageInput!): Message!
}

type Subscription {
  messagePosted(userID: ID): Message!
}

処理の流れは以下です。

  • クライアントA messagePosted()でSubscribeしてメッセージを待ち受ける。

  • クライアントB joinUser()でチャネルに参加する。

  • クライアントB postMessage()でチャネルにメッセージを送信する。

  • クライアントA messagePosted()でチャネル内のメッセージを受信する。

今回はコメントアウトしているqueryは使用しないので割愛します。 その他にもGitHub上では上記に無いAPIがあったり、RDBでUserを永続化していたりしますが今回は触れません。


型を定義できたので gqlgen のコマンドでresolver等を自動生成します。

func (r *mutationResolver) JoinUser(ctx context.Context, input gmodel.JoinUserInput) (*gmodel.User, error) {
    panic(fmt.Errorf("not implemented"))
}

func (r *subscriptionResolver) MessagePosted(ctx context.Context, userID *string) (<-chan *gmodel.Message, error) {
    panic(fmt.Errorf("not implemented"))
}

func (r *mutationResolver) PostMessage(ctx context.Context, input gmodel.PostMessageInput) (*gmodel.Message, error) {
    panic(fmt.Errorf("not implemented"))
}

実装していくResolverが生成されました。 あとはこのResolverを埋めていくだけです。


user.resolver.go joinUser()

joinUserではrdbからuser検索の処理を行なっています。 setNxメソッドを別を呼び出しメッセージを "room" チャネルにPublishします。

func (r *mutationResolver) JoinUser(ctx context.Context, input gmodel.JoinUserInput) (*gmodel.User, error) {
    var user gmodel.User
    if err := r.db.First(&user, input.UserID).Error; err != nil {
        return nil, err
    }

    if err := r.subscribers.Message.SetNx(ctx, user.ID); err != nil {
        return nil, err
    }

    return &user, nil
}
func (m *MessageSubscriber) SetNx(ctx context.Context, userID string) error {
    val, err := m.client.SetNX(ctx, userID, userID, 60*time.Minute).Result()
    if err != nil {
        log.Println(err)
        return err
    }
    if !val {
        return errors.New("this user name has already used")
    }

    return nil
}

setNxは Redisの SET コマンドにNXオプションを付与したメソッドです。

SET – Redis 日本語訳

ここではKeyValue のSETが登場していますが、Pub/Subは発生していません。 NXオプションでuserがチャットに既に参加しているかどうかを判定しKeyが存在しなければKeyValueStoreに登録しています。


message.resolver.go PostMessage()

func (r *mutationResolver) PostMessage(ctx context.Context, input gmodel.PostMessageInput) (*gmodel.Message, error) {
    msgSubscriber := r.subscribers.Message
    isJoined, err := msgSubscriber.CheckJoined(ctx, input.UserID)
    if err != nil || !isJoined {
        return nil, err
    }

    isSet, err := msgSubscriber.SetExpire(ctx, input.UserID)
    if err != nil || !isSet {
        return nil, err
    }

    var user gmodel.User
    if err := r.db.First(&user, input.UserID).Error; err != nil {
        return nil, err
    }

    m := &gmodel.Message{
        ID:      user.ID,
        User:    &user,
        Message: input.Message,
    }

    if err := msgSubscriber.PublishMsg(ctx, m); err != nil {
        return nil, err
    }

    return m, nil
}
func (m *MessageSubscriber) PublishMsg(ctx context.Context, msg *gmodel.Message) error {
    mb, err := json.Marshal(msg)
    if err != nil {
        return err
    }
    m.client.Publish(ctx, "room", mb)
    return nil
}

PostMessageではjson形式でメッセージとuser情報を整形して "room" チャネルにPublishしています。


subscription.resolver.go MessagePoseted()

func (r *subscriptionResolver) MessagePosted(ctx context.Context, userID *string) (<-chan *gmodel.Message, error) {
    msgSubscriber := r.subscribers.Message
    isJoined, err := msgSubscriber.CheckJoined(ctx, *userID)
    if err != nil || !isJoined {
        return nil, err
    }

    msgChan := msgSubscriber.MakeChan(ctx, userID)
    return msgChan, nil
}
func (m *MessageSubscriber) MakeChan(ctx context.Context, userID *string) <-chan *gmodel.Message {
    messageChan := make(chan *gmodel.Message, 1)
    m.mutex.Lock()
    m.msgChannels[*userID] = messageChan
    m.mutex.Unlock()

    go func() {
        <-ctx.Done()
        m.mutex.Lock()
        delete(m.msgChannels, *userID)
        m.mutex.Unlock()
        m.client.Del(ctx, *userID)
    }()

    return messageChan
}

チャネルを生成しています。 またここでもgoroutineを作成し、接続が切れた時にはチャネルとRedisのKeyを削除しています。

以上でPub/Subの実装ができました。

PlayGround

では実装したサーバーを起動して見ます。

無事にPub/Subでメッセージの送受信を行うことができました。 実装は以上となります。


最後に、 このブログではweb開発について発信していくのでまたご覧頂けると嬉しいです。 最後までお読み頂きありがとうございました。

Redis PubSub についてまとめてみた。

概要

RedisのPubSubについてまとめてみた。 DockerでCLIを叩いて実際の動きを確認していく。

Pub / Sub とは

https://redis.io/topics/pubsub

redis.io

Pub / Subシステムの登場人物

  • Subscriber (受信者)
  • Publisher ( 送信者)
  • Channel (部屋)
  • Message(送信者が送信、受信者が受信するメッセージ)

Pub / Subの概要

送信者(パブリッシャー)が特定の受信者(サブスクライバー)にメッセージを送信するようにプログラムされていません。むしろ、公開されたメッセージは、サブスクライバーが存在する可能性があるかどうかを知らなくても、チャネルに特徴付けられます。

メールのように特定の宛先ではなく、送信者はChannelに向けてデータを送り、そのChannelで待ち受けている受信者が流れてきたデータの通知を受けるイメージ。

また、RedisはKVSのNoSQLとして有名だがこのPub/Sub機能については別の概念になる為、混同しないように気をつけたい。

Pub / Subはキースペースとは関係ありません。データベース番号を含め、どのレベルでも干渉しないように作られています。 db 10での公開は、db1のサブスクライバーによって聞こえます。 何らかのスコープが必要な場合は、チャネルの前に環境の名前(テスト、ステージング、本番など)を付けます。

環境構築

FROM        reids:latest
EXPOSE      6379
ENTRYPOINT  ["redis-server"]

dockerにアタッチしてコマンドラインを開く

$ docker exec -it  {コンテナ名} bash

$ redis-cli

 
127.0.0.1:6379> 

PubSubコマンドの実践

cliを開いたので実際にコマンドを叩いて試していく。

ターミナルを開きそれぞれ

redis-cli コマンドを実行

左側がSubscriber 右側がPublisherとする。

f:id:shikatech:20210821104743p:plain

SUBSCRIBEコマンド

SUBSCRIBE – Redis

SUBSCRIBE チャネル名...

引数にチャネル名を取る。また複数のチャネルを指定する事で同時にSUBSCRIBEする事ができる。

左側のターミナル

127.0.0.1:6379> SUBSCRIBE foo bar

1) "subscribe"
2) "foo"
3) (integer) 1
1) "subscribe"
2) "bar"
3) (integer) 2

上記の場合 foo, bar 二つのチャネルをSUBSCRIBEしている事になり、 (integer)2 はSUBSCRIBEしているチャネルの数を示している。

PUBLISHコマンド

PUBLISH – Redis

SUBSCRIBE チャネル名 メッセージ

現在左側のターミナルは SUBSCRIBEコマンドで待ち受けている状態なので、 右側のターミナルからメッセージを送信する。

PUBLISH foo hello

f:id:shikatech:20210821112020p:plain

左側のターミナルが以下のように通知された

1) "message"
2) "foo"
3) "hello"

通知されたデータは以下のようになる。

  • 1) 別のクライアントからmessageがpublishされた事を示し、
  • 2) チャネル名
  • 3) messageの内容

ここまでがPub/Subの基本動作となる。

以上、今回は基本Pub/Subの概念と基本コマンドをまとめた。 実際のアプリケーションでは何らかの言語でクライアントライブラリを使用して 実装することになる。

次回はGoとgo-redisを使ってチャット機能を実装していきたい。

gqlgen directive @goField と Global Object Identification

今回はgqlgenの機能を使って Relayの仕様であるGlobal Object Identificationの実装を行なっていく。

Global Object Identificationとは

GraphQLのクライアントライブラリRelayで定められたIDフィールドの実装方法。

relay.dev

ざっくりとした説明は 全スキーマ間(モデル、テーブル)で一意なIDを返そうという方針。

今回の場合だと Task typeのIDを返したいので、 TASK:1 のようにグローバルでユニークなIDを返したい。

この実装をgqlgen(Go) で

directive という機能を用いて実装していく。

筆者は実務でApolloClientを使用しているが、以下の理由で大変便利な為実装している。

• オブジェクトを再取得するため

• Connection を通じてページングを実装するため

• Mutation の結果を予測可能にするため

• クライアント側のもつキャッシュを適切に更新するため

参考: GraphQLスキーマ設計ガイド 第2版 - わかめの自動売り場 - BOOTH

以下実装↓

スキーマ

interface Node {
  id: ID!
}

QueryでIDを含めたオブジェクトを返したい時は上記のNode インターフェースを使用する。

type Query {
  task(id: ID!): Task!
}

type Task implements Node {
  id: ID! 
  title: String!
  note: String!
  completed: Int! # 0 or 1
  created_at: Time!
  updated_at: Time!
}

IDは ID!型であるが、gqlgen.ymlで以下を指定しているため実質string型となる。 intなどを指定すると 同じID型でもint型になる。

models:
  ID:
    model:
      - github.com/99designs/gqlgen/graphql.ID

また、上記のIDにはdirectiveを使用していないがひとまずこのまま進める。

resolver

この状態で gqlgenコマンドを使うと下記Resolverが生成される。

func (r *queryResolver) Task(ctx context.Context, id string) (*model.Task, error) {
    panic(fmt.Errorf("not implemented"))
}

ここでIDを起点にDBからレコードを検索してリターンする。

func (r *queryResolver) Task(ctx context.Context, id string) (*model.Task, error) {
    var task model.Task
    if err := r.DB.First(&task, id).Error; err != nil {
        return nil, err
    }

    return &task, nil
}

しかし必要なのは TASK: ○○ というグローバルに判定できるIDなので 以下の様なコードが必要

   task.ID = fmt.Sprintf("%s:%s","TASK",task.ID)

出力結果

{
  "data": {
    "task": {
      "id": "TASK:10",
      "title": "test"
    }
  }
}

これでグローバルなIDを返すことが出来た。 ※Task()ではStringから ID情報を抜き取る実装が必要になる。

directiveを使う

ここまでの実装を gqlgenの directive機能を使って分割することができる。

スキーマに以下を追加

 # 追加
directive @goField(forceResolver: Boolean, name: String) on INPUT_FIELD_DEFINITION
    | FIELD_DEFINITION



type Task implements Node {
  id: ID!  @goField(forceResolver: true)  # 追加
  title: String!
  note: String!
  completed: Int! # 0 or 1
  created_at: Time!
  updated_at: Time!
}

その上で再びgqlgenコマンドを実行すると以下のresolverが追加される。

func (r *taskResolver) ID(ctx context.Context, obj *model.Task) (string, error) {
    panic(fmt.Errorf("not implemented"))
}

IDフィールドだけ別リゾルバーとして中身の処理を書き換えることができる。 引数には初めに実装した Task() リゾルバーの値が入ってくる。 以下の様に実装する事でグローバルなIDを返す。

func (r *taskResolver) ID(ctx context.Context, obj *model.Task) (string, error) {
    return fmt.Sprintf("%s:%s", "TASK", obj.ID), nil
}

以上でIDフィールドを別リゾルバーに分割し実装することができた。

directiveの用途

今回の実装ではそこまでdirectiveの旨味を示せていない。

ネストしたスキーマなどでDBアクセスを分離したい時directiveを使えば不要なDB処理を減らせるのが1番大きなメリットである。

例えば以下の様な感じ。

type User {
id: ID!
name: String!

Tasks: [Task!]!  @goField(forceResolver: true)
}

この様にする事でクライアントがuserのidとnameのみが欲しい時に無駄にTaskを取得するのを防ぐことができる。

雑感

directive機能でResolverを分割する方法は理解したが、

forceReolver, name の引数が何に使用されるかが良く分からない。

forceResolverはおそらく指定したtypeのResolverを使用する(なければ生成)。 今回の場合はtaskResolver。

試しにfalseにするとtaskResolverは削除された。

nameに関しては良く分からない。

directive @goField(forceResolver: Boolean, name: String) on INPUT_FIELD_DEFINITION
    | FIELD_DEFINITION

このあたりドキュメントを読んでもあまり詳しい説明がないので機会があれば深ぼっていきたいと思った。

以上

GraphQL ページネーション Go gqlgen

GraphQLの必須項目ページネーションの実装方法を理解する。

使用技術 Go1.13 × gqlgen v0.13.0

ページネーションとは

APIのクエリを制限する方法。

  • 返却するデータの量を制限する事ができる。
  • クエリの負荷を事前に計算できる。

GraphQLのページネーション

GraphQLではページネーションの方針に決まりはないが、デファクトスタンダードRelay-style cursor pagenationという方式がある。 これはFaceBookが考案し、GitHubのGraphQL APIでも採用されている。

ページネーションの実装方法は自由だがデファクトスタンダードに乗っかることで使用者側の学習コストを抑えることができる。

Relay-style cursor pagenation

公式: relay.dev

こちらは以下の記事の画像が大変わかりやすい

f:id:shikatech:20210718150646p:plain
登場人物

https://qiita.com/gipcompany/items/ffee8cf0b1522a741e12

ざっくり説明すると

GraphQL API Queryの引数を以下の様な形にする

{
  # 次方向ページングの時に使う
  first: Int # 順方向に何件か
  after: String # afterで指定したcursorより後のedgeを取得

  # 前方向ページングの時に使う
  last: Int # 逆方向に何件か
  before: String # beforeで指定したcursorより前のedgeを取得
}

上記の引数に応じて以下の形でデータを返す。

〇〇Connection {
  pageInfo: PageInfo!
  edges: [Edge!]!
  nodes: [データエンティティー!]!
}

Connectionと明示的に宣言することでページネーションを実装しているQueryであることを示している。

登場人物一覧

// Queryの引数 
  first: Int
  after: String
  last: Int
  before: String

//返却されるデータのまとまり
Connection: {

//取得したConnectionのページ情報
  pageinfo: {
    startCursor: String
    endCursor: String
    hasNextPage: Boolean!
    hasPreviousPage: Boolean!
  }

//crsornodeを含むデータ配列
  edges: [edge]

//nodeのデータ配列
  nodes: [node]
}


//エンティティーオブジェクト
node: {}

//データの位置情報( 主にIDなどが使われる)
cursor: string 

実装

これまで見てきたページネーションを実装していく。

スキーマ

まずはスキーマ定義

page.graphql

type PageInfo {
  startCursor: String
  endCursor: String
  hasNextPage: Boolean!
  hasPreviousPage: Boolean!
}

interface Connection {
  pageInfo: PageInfo!
  edges: [Edge!]!
  nodes: [Node!]!
}

interface Edge {
  cursor: String!
  node: Node!
}

input PaginationInput {
  first: Int
  after: String
  last: Int
  before: String 
}

task.graphql

type Task implements Node {
  id: ID!
  title: String!
  note: String!
  completed: Int! # 0 or 1
  created_at: Time!
  updated_at: Time!
}

input NewTask {
  title: String!
  note: String!
}

input UpdateTask {
  id: ID!
  title: String
  note: String
  completed: Int
}

type TaskConnection implements Connection {
  pageInfo: PageInfo!
  edges: [TaskEdge!]!
  nodes: [Task!]!
}

type TaskEdge implements Edge {
  cursor: String!
  node: Task!
}

graphqlファイルは分割しMutation, Queryはschema.graphqlに記載

schema.graphql

schema {
  query: Query
  mutation: Mutation
}

type Mutation {
  createTask(input: NewTask!): Task!
  updateTask(input: UpdateTask!): Task!
}

type Query {
  tasks(input: PaginationInput!): TaskConnection!
  task(id: ID!): Task!
}


interface Node {
  id: ID!
}

scalar Time

Resolver

次はResolver

gqlgen コマンドでresolverは自動生成される。

今回はTasks()を実装していく。 生成直後は以下の通りpanicになる。

※筆者はresolverをモデル毎に分割する為にgqlgenの自動生成を不可にした。 実装の中身は変わらない。

参考: https://qiita.com/masalennon/items/e4df77e5f58f862db644

func (r *queryResolver) Tasks(ctx context.Context, input model.PaginationInput) (*model.TaskConnection, error) {
    panic(fmt.Errorf("not implemented"))
}

コード

上記のpanicを埋めたコードが以下。

func (r *queryResolver) Tasks(ctx context.Context, input model.PaginationInput) (*model.TaskConnection, error) {
    // validation
    if input.First == nil && input.Last == nil {
        return nil, errors.New("input.First or input.Last is required: input error")
    }
    if input.First != nil && input.Last != nil {
        return nil, errors.New("passing input.First and input.Last is not supported: input error")
    }
    if input.Before != nil && input.After != nil {
        return nil, errors.New("passing input.Before and input.After is not supported: input error")
    }

    var limit int
    if input.First != nil {
        limit = *input.First
    } else {
        limit = *input.Last
    }
    var tasksSizeLimit = 100
    if input.First != nil && *input.First > tasksSizeLimit {
        return nil, errors.New("input.First exceeds tasksSizeLimit: input error ")
    }
    if input.Last != nil && *input.Last > tasksSizeLimit {
        return nil, errors.New("input.Last exceeds tasksSizeLimit: input error ")
    }

    db := r.DB
    var tasks []*model.Task

    if input.After != nil {
        db = db.Where("id > ?", *input.After)
    }

    if input.Before != nil {
        db = db.Where("id < ?", *input.Before).Order("id desc")
    }

    if input.Last != nil {
        db = db.Order("id desc")
    }

    if err := db.Limit(limit + 1).Find(&tasks).Error; err != nil {
        return nil, errors.New("could not find tasks: data base error ")
    }

    //検索結果0の場合
    if len(tasks) == 0 {
        return &model.TaskConnection{
            PageInfo: &model.PageInfo{
                StartCursor:     nil,
                EndCursor:       nil,
                HasNextPage:     false,
                HasPreviousPage: false,
            },
            Edges: []*model.TaskEdge{},
            Nodes: []*model.Task{},
        }, nil
    }

    edges := make([]*model.TaskEdge, len(tasks))
    nodes := make([]*model.Task, len(tasks))

    // last, before 指定の時はスライスの後ろから入れていく
    if input.First != nil || input.After != nil {
        for i, task := range tasks {
            newEdge := &model.TaskEdge{
                Cursor: strconv.Itoa(task.ID),
                Node:   task,
            }
            nodes = tasks
            edges[i] = newEdge
        }
    } else {
        for i, task := range tasks {
            newEdge := &model.TaskEdge{
                Cursor: strconv.Itoa(task.ID),
                Node:   task,
            }
            nodes[len(tasks)-1-i] = tasks[i]
            edges[len(edges)-1-i] = newEdge
        }
    }

    startCursor := edges[0].Cursor
    endCursor := edges[len(edges)-1].Cursor

    var hasPreviousPage bool
    var hasNextPage bool

    // startCursorのIDより前に1件でもデータがある場合はpreviousPageはtrue
    startCursorInt, _ := strconv.Atoi(startCursor)
    endCursorInt, _ := strconv.Atoi(endCursor)
    var task model.Task
    if input.First != nil {
        if err := r.DB.Where("id <= ?", startCursorInt-1).First(&task).Error; err == nil {
            hasPreviousPage = true
        }
    } else {
        if err := r.DB.Where("id >= ?", endCursorInt+1).First(&task).Error; err == nil {
            hasNextPage = true
        }
    }

    // Firstが渡された場合 if limit以上 else limit以下
    if input.First != nil && limit < len(nodes) {
        endCursor = edges[len(edges)-2].Cursor
        hasNextPage = true
        return &model.TaskConnection{
            PageInfo: &model.PageInfo{
                StartCursor:     &startCursor,
                EndCursor:       &endCursor,
                HasNextPage:     hasNextPage,
                HasPreviousPage: hasPreviousPage,
            },
            Edges: edges[:len(edges)-1],
            Nodes: nodes[:len(nodes)-1],
        }, nil
    } else if input.First != nil && limit >= len(nodes) {
        return &model.TaskConnection{
            PageInfo: &model.PageInfo{
                StartCursor:     &startCursor,
                EndCursor:       &endCursor,
                HasNextPage:     hasNextPage,
                HasPreviousPage: hasPreviousPage,
            },
            Edges: edges,
            Nodes: nodes,
        }, nil
    }

    // Lastが渡された場合 if limit以上 else limit以下
    if input.Last != nil && limit < len(nodes) {
        startCursor = edges[len(edges)-limit].Cursor
        hasPreviousPage = true
        return &model.TaskConnection{
            PageInfo: &model.PageInfo{
                StartCursor:     &startCursor,
                EndCursor:       &endCursor,
                HasNextPage:     hasNextPage,
                HasPreviousPage: hasPreviousPage,
            },
            Edges: edges[len(edges)-limit:],
            Nodes: nodes[len(nodes)-limit:],
        }, nil
    } else if input.Last != nil && limit >= len(nodes) {
        return &model.TaskConnection{
            PageInfo: &model.PageInfo{
                StartCursor:     &startCursor,
                EndCursor:       &endCursor,
                HasNextPage:     hasNextPage,
                HasPreviousPage: hasPreviousPage,
            },
            Edges: edges,
            Nodes: nodes,
        }, nil
    }

    return nil, nil
}

解説

基本的にはRelayライブラリのアルゴリズムを満たしていった。(途中からかなり我流になった。)

relay.dev

問題はRelayでは全てをSELECTする仕様になっているのでRDBではレコード数が大量になった時に負荷がかかるということ。

筆者は以下のようにした

   if input.First != nil {
        if err := r.DB.Where("id <= ?", startCursorInt-1).First(&task).Error; err == nil {
            hasPreviousPage = true
        }
    } else {
        if err := r.DB.Where("id >= ?", endCursorInt+1).First(&task).Error; err == nil {
            hasNextPage = true
        }
    }

取得してきたレコードの前後にデータが存在しているかをチェックしてpageinfoを判定した。 もっといいやり方がありそう...

考察

  • 今回はConnectionの全ての要件を満たすように実装したが、実際のサービスでは 前方方向(first, after)か後方方向(last, before)どちらかだけ実装するのが無難。 実際に筆者の現場や他の記事がそうだった。

  • 検索用の引数などカスタマイズする事も可能

  • 開発者によって実装にバラつきがある。

このISSUEはアルゴリズムの実装を議論している

https://github.com/graphql/graphql-relay-js/issues/94

gqlgenではプラグインで追加される可能性もあるみたいなので車輪の開発が待ち遠しい。

以上

Apollo Client (React) 触ってみた。 Mutations編

前回の続き

https://shikatech.hatenablog.com/entry/2021/07/03/222958

Mutations

GraphQLのMutationを実行するHooks

https://www.apollographql.com/docs/react/data/mutations/#prerequisites

  const [addTodo, { data }] = useMutation(ADD_TODO);

useQueryと違い自動実行ではない。 タプルで発火用の関数(addTodo)を返し、任意のタイミングで実行させる。

useMutation、生成された発火関数、どちらでもオプションを指定でき、 別のオプションを指定した場合はuseMutationをオーバーライドして使う。

cacheも自動更新される。

使ってみた。

  const [updateTask] = useUpdateTaskMutation()


<button onClick={()=>updateTask({
                  variables: {
                    id: task?.id as string,
                    completed: task?.completed === 1 ? 0 : 1
                  }
                })}>
                  ✔︎
</button>

f:id:shikatech:20210704115812p:plain

cacheの値もDBも更新された。

上記は単一のレコードを変更した場合で、 複数のレコードを同時に更新する場合は自動でcache更新は行われない。

cacheも同時に変更する為にはupdate()関数が必要。

以下ドキュメントから抜粋 https://www.apollographql.com/docs/react/data/mutations/#making-all-other-cache-updates

const GET_TODOS = gql`
  query GetTodos {
    todos {
      id
    }
  }
`;

function AddTodo() {
  let input;
  const [addTodo] = useMutation(ADD_TODO, {
    update(cache, { data: { addTodo } }) {
      cache.modify({
        fields: {
          todos(existingTodos = []) {
            const newTodoRef = cache.writeFragment({
              data: addTodo,
              fragment: gql`
                fragment NewTodo on Todo {
                  id
                  type
                }
              `
            });
            return [...existingTodos, newTodoRef];
          }
        }
      });
    }
  });

  return (
    <div>
      <form
        onSubmit={e => {
          e.preventDefault();
          addTodo({ variables: { type: input.value } });
          input.value = "";
        }}
      >
        <input
          ref={node => {
            input = node;
          }}
        />
        <button type="submit">Add Todo</button>
      </form>
    </div>
  );
}

ほかにもAPIが提供されているが、useQueryと重複するモノも多かった為、 Mutations編はここまで。

Subscriptionはバックエンドが出来ればやっていきたいと思う。