ストリーミングでつぶやきをストリーミング


無料のAPIを使用して構築は、自分の好きな言語で自分の新しいスキルを教えるための素晴らしい方法です.私はいつも何か新しいことを学ぶ下劣な方法としてAPIを見つけました.APIを構築すると、ビデオチュートリアルではできないプログラミングの新しい部分を学ぶことを強制する課題をもたらします.
TwitterのAPIのフィルタリングストリームのエンドポイントは、パブリックつぶやきのリアルタイムストリームをフィルタリングすることができます.あなたは特定の属性のつぶやきをフィルタリングすることにより、Twitterの議論にタップすることができます.あなたは最新のジョブの投稿を見つけることができる天気予報イベント、またはトレンドの上に監視します.
この記事では、Twitterのルールを作成し、オープンソースライブラリを使ってストリームを管理する方法について説明します.このライブラリは私のプロジェクトのために構築されたfindtechjobs それで、私はTwitterで最新の技術仕事を見つけました.
あなたが完全なコード例を始めるために始めたならば

どこから始めますか。


最初のステップは、アプリを作成し、消費者のキーのセットを取得することです.あなたがAPIキーとAPI秘密キーを持っている場合、あなたはtwitterstream

アクセストークンを生成する


私たちはtwitterstream アクセストークンを生成する.このアクセストークンは、前進するすべてのネットワーク要求を認証するのに使用されます.下記のコードで、我々は「基本的な」HTTP認証方式でTwitterのものにネットワーク要求をします.そして、twitterstream アクセストークンで.
    tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret("YOUR_KEY", "YOUR_SECRET_KEY").RequestBearerToken()
// Create an instance of twitter api
    api := twitterstream.NewTwitterStream(tok.AccessToken)

ストリーミング規則の設定


ストリーミングルールを使用してストリームを適切な情報を提供します.ルールは、メッセージのキーワード、ハッシュタグ、URLなどのTwitterの属性の様々な一致します.偉大なルールを作成するには、Twitterのストリームを成功させるための基本です.あなたが関連情報を収穫することができるように、あなたが流れ出るのであなたの規則を精製し続けることは重要です.
ソフトウェアエンジニアの就職のためのストリームを作成しましょうtwitterstream . Tweetを有効にするには
  • 英語で投稿
  • 流行しない
  • もう一つのつぶやきへの返事でない
  • 「採用」という単語を含む
  • そして、「ソフトウェア開発者」または「ソフトウェアエンジニア」という語を含みます
  • The twitterstream パッケージは建築規則を容易にする.私たちはNewRuleBuilder TwitterのAPIとして多くのルールを作成するには、当社の消費者のキーを可能にします.
    rules := twitterstream.NewRuleBuilder().
            AddRule("lang:en -is:retweet -is:quote hiring (software developer OR software engineer)", "hiring software role").
                Build()
    
    res, err := api.Rules.Create(rules, false)
    
    最初の部分はtwitterstream 作成するNewRuleBuilder .
    我々のルールを追加するとき、我々は2つの引数を渡しますAddRule . 一つ目は多くの演算子を持つ長い文字列です.それらの間のスペースを伴う連続した演算子はブール「AND」論理に終わります.例えばcats dogs 単語“猫”と“犬”を含むつぶやきに一致します.2番目の引数AddRule タグラベルです.これは、ストリーミング応答の特定のつぶやきに一致する規則を識別するために使用できる自由形式のテキストです.タグは、ルール間で同じことができます.
    最初の議論に集中しましょう.それぞれの演算子は何かユニークです.
  • 最初はシングルですlang:en BCP 47の言語識別子です.これは、英語で書かれたつぶやきのためのストリームをフィルタリングします.単一のLANG演算子をルールで使用することができます.
  • それから、我々はRetweets-is:retweet . 演算子の前にマイナス記号を含めることで論理(否定)を使用しません.否定は言葉にも適用できます.例えば、cat #meme -grumpy 単語の猫とのつぶやきと一致します.
  • また、引用符のつぶやきを除外する-is:quote . 引用つぶやきはコメントでつぶやきです、そして、私はこのオペレーターが非常に役に立つとわかりました.私がFindTechJobsを造っていた時.Io、私は多くの人々が彼らの意見で自動雇用に関する記事をretweetingに遭遇しました.これらの引用つぶやきは、関係のない仕事転勤で私のデータセットを乱雑にしました.
  • 私はそれを含む言葉につぶやきの私のストリームを絞り込むhiring . 仕事についてさえずる人々は、「私のチームが雇っている」と言います、あるいは、「startupcoは雇っています...」.
  • 最後に(software developer OR software engineer) , OR演算子を組み合わせた演算子のグループ化です.つぶやきがこれらの単語のいずれかが含まれている場合、つぶやきが一致します.
  • 我々のルールを構築した後、我々はそれらを作成するapi.Rules.Create . あなたのルールを削除する場合は、使用することができますapi.Rules.Delete あなたが現在持っている各々の規則のIDで.あなたの現在のルールを見つけることができますapi.Rules.Get .
    あなたはもっと学ぶことができます.また、ルールを作成するエンドポイントです.

    非マーシャルフックを設定する


    我々はツイートをunmarshalすることができますので、私たちのつぶやきの独自の構造を作成する必要があります.Twitterのフィルタされたストリームのエンドポイントは、私たちは、それぞれのつぶやき(後の詳細)の追加情報を取得することができます.このデータを簡単に見つけるために、データモデルを表す構造体を作成する必要があります.
    type StreamDataExample struct {
        Data struct {
            Text      string    `json:"text"`
            ID        string    `json:"id"`
            CreatedAt time.Time `json:"created_at"`
            AuthorID  string    `json:"author_id"`
        } `json:"data"`
        Includes struct {
            Users []struct {
            ID       string `json:"id"`
            Name     string `json:"name"`
            Username string `json:"username"`
            } `json:"users"`
        } `json:"includes"`
        MatchingRules []struct {
            ID  string `json:"id"`
            Tag string `json:"tag"`
        } `json:"matching_rules"`
    }
    
    ストリームされているすべてのつぶやきは[]bytes デフォルトでは.私たちは、データを各ツイートをunmarshalingすることによって使用可能にすることができますStreamDataExample . 非マーシャルフックを設定することが重要ですSetUnmarshalHook それで、我々は処理することができます[]bytes Goroutineの安全な方法で.
    api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
        data := StreamDataExample{}
    
        if err := json.Unmarshal(bytes, &data); err != nil {
            fmt.Printf("failed to unmarshal bytes: %v", err)
        }
    
        return data, err
    })
    
    データモデルがどのように見えるか不明な場合は、常にバイトのスライスから文字列を作成できます.
    api.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
        return string(bytes), nil
    })
    

    ストリームの開始


    私たちのストリーミングルールとunmarshalフックを作成した後、我々はつぶやきをストリーミングを開始する準備が整いました.
    デフォルトでは、Twitterは、各つぶやきについての情報の限られた量を返すときにストリーム.我々は、ストリームの拡大と、各つぶやきの追加情報を要求することができます.
        streamExpansions := twitterstream.NewStreamQueryParamsBuilder().
            AddExpansion("author_id").
            AddTweetField("created_at").
            Build()
    
        // StartStream will start the stream
        err = api.StartStream(streamExpansions)
    
    私たちは最初にいくつかのストリーム展開を作成しますNewStreamQueryParamsBuilder . このビルダーは、クエリパラメータを作成してストリームを開始します.ここでは、それぞれのつぶやきに2つの追加の情報を追加している
  • AddExpansion("author_id") 各つぶやきストリームの著者のIDを要求します.これは、ツイートしているユーザーを追跡している場合に便利です.
  • AddTweetField("created_at") つぶやきがつぶされた時間を要求します.あなたが年代順につぶやきをソートする必要がある場合、これは便利です.
    あなたは利用可能な詳細について学ぶことができます
  • それから、我々は我々の拡大でストリームを始めますapi.StartStream . このメソッドは、Twitterのストリーミングエンドポイントに長いランニングを要求します.リクエストはネットワークリクエストの持続時間を通じて増加的に解析される.あなたがTwitterからストリーミングデータを消費する方法について学ぶことに興味があるならば、あなたは彼らのドキュメンテーションを読むべきです

    消費量


    私たちの長い実行中のリクエストで処理される各つぶやきは、GOチャネルに送信されます.我々range このチャンネルを介して各つぶやきを処理し、Twitterからのエラーをチェックします.我々が起動するとき、ストリームは止まりますapi.StopStream , その後、ループの残りの部分をスキップして、先頭に戻るとclose チャンネルからの信号.
       // Start processing data from twitter after starting the stream
        for tweet := range api.GetMessages() {
    
            // Handle disconnections from twitter
            if tweet.Err != nil {
                fmt.Printf("got error from twitter: %v", tweet.Err)
    
                // Stop the stream and wait for the channel to close on the next iteration.
                api.StopStream()
                continue
            }
            result := tweet.Data.(StreamDataExample)
    
            // Here I am printing out the text.
            // You can send this off to a queue for processing.
            // Or do your processing here in the loop
            fmt.Println(result.Data.Text)
        }
    
    Twitterのサーバーは、ストリーム接続を無期限に保持しようとします.Twitterからのエラーは、ストリームで利用可能になります.切断はいくつかの理由から起こり得る.
  • ストリーミングサーバーは、Twitter側で再起動されます.これは通常、コード展開に関連しており、一般的に期待され、周囲に設計されるべきです.
  • あなたのアカウントは、つぶやきのあなたの毎日/毎月のクォータを超えた.
  • アクティブな冗長接続が多すぎます.

  • Twitterからの切断の予想


    それはあなたのストリームの関連情報を欠落しているので、可能な限りTwitterに接続を維持するために重要なデータソースを作成することが重要です.これは、切断が発生し、再接続ロジックをTwitterからの切断を処理するために構築されることを期待する必要があります
    再接続ロジックをビルドすることができますtwitterstream ’S APIとdefer 声明以下は、スニペットです
    // This will run forever
    func initiateStream() {
        fmt.Println("Starting Stream")
    
        // Start the stream
        // And return the library's api
        api := fetchTweets()
    
        // When the loop below ends, restart the stream defer initiateStream()
        defer initateStream()
    
        // Start processing data from twitter
        for tweet := range api.GetMessages() {
            if tweet.Err != nil {
    
                fmt.Printf("got error from twitter: %v", tweet.Err)
    
                api.StopStream()
                continue
            }
            result := tweet.Data.(StreamDataExample)
            fmt.Println(result.Data.Text)
        }
        fmt.Println("Stopped Stream")
    }
    
    
    ストリームを開始した後、ツイートを処理する前にdefer メソッド自体.これは、メッセージチャネルが終了するたびにTwitterに再接続を処理します.

    最後の思考


    私はあなたがTwitterからのつぶやきをストリーミングでこのライブラリに便利を見つける願っています.このライブラリを構築することは挑戦でした、そして、私はゴーの並行性モデルがどのように働くかについて学びました.あなたがこのポストが好きであるならば、私がソフトウェア界で私の旅行を文書化するので、私の上について来てください.