アパッチスパークによる感情分析
目次
導入
Data streaming is the process of transmitting a continuous flow of data (also known as streams) typically fed into stream processing software to derive valuable insights. A data stream consists of a series of data elements ordered in time.
In this blog we will use tweepy and PySpark to connect together and stream tweets replies from Twitter API that's related to specified topics then analyze this tweet replies with Machine Learning models to evaluate sentiments which is Positive or Negative.
Our Architecture will be divided into four parts as shown in the following image:第1部:TwitterのAPIをPySparkに接続するリスナーを作成する
Before we start it's necessary to have a Twitter Developer Account to get an access to the Twitter API so we can connect it with PySpark and get the data.
You can get apply to a Developer Account from .
Now let's start with the codes:
ステップ1
必要なライブラリをインポートする必要があります.
#Necessary Libraries
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
Tweepyで我々のTwitterのAPI資格情報を認証することができますまた、それは我々がブログで後でカバーするPySparkと接続するリスナーインスタンスを作成します.私たちは、火の粉に接続するためにlocalhostとポートをつくるために、ソケットを必要とします.
そしてJSONはJSONでストリーミングされているので、Twitter APIから得られるメッセージを処理するために使用されます.
ステップ2
証明書を挿入する必要があります.
#Twitter API credentials
CONSUMER_KEY = "secret"
CONSUMER_SECRET = "secret"
ACCESS_TOKEN = "secret"
ACCESS_SECRET = "secret"
開発者アカウントを作成した後に、4つの資格情報を取得します.消費者キー
消費者秘密
アクセストークン
アクセス秘密
我々は、Twitter APIに接続するこれらの認証を使用します.
ステップ3
ユーザはトピックを指定します:
#User inputs keywords, Topic of the tweets
keywords = []
n = int(input("Enter the number of keywords for your tweets topic\n"))
for i in range(n):
i = input("Enter keyword number: " + str(i+1) + '\n')
keywords.append(i)
我々は、ユーザーが必要なキーワードの数を取るために入力を取るようにつぶやきのトピックを指定するユーザーを必要とし、我々は指定された番号のループを作成し、それぞれの入力を取る.ステップ4
ストリームリスナーオブジェクトを作成する
#StreamListener object to get the data from Twitter API
class TweetsListener(StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
#Load the data as json structure
msg = json.loads(data)
#Get only extendable replies from the tweets
if isinstance(msg['in_reply_to_status_id_str'], str):
if "extended_tweet" in msg:
self.client_socket.send(str(msg['extended_tweet']['full_text']+"\n").encode('utf-8'))
print(msg['extended_tweet']['full_text'])
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def if_error(self, status):
print(status)
return True
StreamListenerオブジェクトは、TwitterのAPIは、つぶやきは、指定されたトピックの返信を取得すると接続されます、我々はつぶやきの返信を得るためにINSERN RESPYCHONE TOHARE STATUSHIGE IDHER STRを指定する必要があります.ステップ5
ストリームを検証します.
def send_tweets(c_socket):
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
twitter_stream = Stream(auth, TweetsListener(c_socket))
twitter_stream.filter(track=keywords, languages=['en'])
資格情報を検証し、接続を送信する別の重要な機能は、リスナーを使用してTwitterのAPIを接続するには、ここでは、ユーザーが以前に入力したキーワードを定義し、つぶやきの言葉を返します.ステップ6
主な機能をまとめてグループ化します:
if __name__ == "__main__":
#Initiate the socket object
new_skt = socket.socket()
#local machine address (changeble)
host = "127.0.0.2"
#Specific port for the service
port = 9003
#Binding host and port
new_skt.bind((host, port))
print("Now listening on port: %s" % str(port))
#waiting for client connection
new_skt.listen(5)
#Establish connection with client. it returns first a socket object,c, and the address bound to the socket
c, addr = new_skt.accept()
print("Received request from: " + str(addr))
# and after accepting the connection, we aill sent the tweets through the socket
send_tweets(c)
ここでは、localhostとポートを結合するソケットを作成し、PySparkを接続するよう要求を行います.PySparkコードを実行するときは、ストリームリスナーに接続します.注意:このコードはPySparkのとは異なるIDEで実行する必要がありますので、次のコードを実行します.
パート2:TwitterのAPIと接続するPySparkを使用してください
ステップ1
必要なライブラリ
#Import necessary libraries
import findspark
findspark.init('E:\spark-3.1.2-bin-hadoop3.2')
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
私たちはスパークフォルダを見つけてSparkContextを見つけてSparkContextをインポートします.スパークの新しいインスタンスを作成します(ストリームごとに1つだけを作成することができます)、StreamingContextはストリームコンテキストを開始するために使用され、SQLContextはSQLコンテキストの開始に必要です.ステップ2
変数の初期化
sc = SparkContext()
#Initiating the StreamingContext with 10 second batch interval.
ssc = StreamingContext(sc, 10)
#Initiating sqlcontext
sqlContext = SQLContext(sc)
ここでは、スパークコンテキストを作成し、SCに格納し、10秒のバッチ間隔とSQLContextでStreamingContextを開始します.ステップ3
ソケットストリームの開始とツイート処理の処理
#Initiating streaming text from a TCP (socket) source:
socket_stream = ssc.socketTextStream("127.0.0.2", 9003)
#Lines of tweets with socket_stream window of size 60
lines = socket_stream.window(100)
#Creating a tuple to assign names
from collections import namedtuple
Tweet = namedtuple( 'Tweet', 'line' )
#Applying different operations on the tweets and save them to a temporary sql table
( lines.map( lambda word: ( word.lower(), 1 ) )
.map( lambda rec: Tweet( rec[0]) )
.foreachRDD( lambda rdd: rdd.toDF().limit(1000)
.registerTempTable("tweets") ) )
私たちは、以前に指定したのと同じlocalhostとポートでSocketRadio変数を作成します.これはストリームリスナーリクエストに対応するために必要です.また、私たちは、それぞれの行をストリーミングし、登録された名前を付けて登録します.
ステップ4
スタートストリーム
#Start Streaming
ssc.start()
だから簡単!ストリームを開始します.start関数は、上記のすべてのコードを適用するストリームリスナーに接続する順序を与えます.
ハンサムデータストリーミングを見てみましょう
ステップ5
エクスポートする
#Export tweet replies in a csv file
import time
count = 0
while count < 100:
time.sleep(10)
lines_sql = sqlContext.sql( 'Select line from tweets' )
lines_df = lines_sql.toPandas()
lines_df.to_csv('tweet_data.csv', mode='a', header=False)
count += 1
このコードは、指定された回数の10秒ごとに更新するループを作成し、つぶやきに格納されている行を取り、DataFrameに変換し、CSVファイルにコンテンツを追加します.注意:各ループは、既に格納されている回答を取ることができるので、我々はいくつかのクリーニングを行うにはExcelが必要です.
Part 3 : Excelを使用してデータを消去する This part is only one step, the data we got is really messy, Look at that:
私たちは19 kの線を越えましたが、ほとんどすべてが重複しています.最初に不要な列を削除する必要があります.
私たちは約7 Kの行を得た重複行を削除した後、我々はこれらのステップをPythonで行うことができますが、我々はCSVファイルを持っているようにExcelでそれを行うには高速です.
第4部:ロジスティック回帰を用いた回答からの感情分析の予測 In this part we used a big dataset from Kaggle.com, That has a 1,6 million tweets with their sentiment analysis, you can find this dataset here .
このデータセットを使用してモデルを訓練します.
ステップ1
必要なライブラリ
import pandas as pd
import numpy as np
from nltk.stem import WordNetLemmatizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import re
import nltk
import matplotlib.pyplot as plt
import seaborn as sns
私たちは、WordNetRemmazerは、それがベースのフォームに単語を変換するために使用されます、TFIDFVECtorizerは、2つの塊に単語をトークン化するために使用され、LogSticreRecoveryは、我々の手法は、感情を予測するTrainCount TestCountスプリットは、我々のモデルのスコアをテストするために使用されます.reは前処理フェーズに対して単語の変更を適用するために使用され、nltkはwordnetlemmatizerをダウンロードするために使用され、matplotlibとseabornは可視化に使用されますステップ2
データセットをインポートする
#Reading the csv file storing it into a DataFrame
df = pd.read_csv('E:\DigiSay Project\Trained Tweets.csv', encoding='latin-1' , names = ["sentiment", "ids", "date", "flag", "user", "text"])
df
#Negative = 0 , Positive = 1 , Neutral = 2
df.sentiment.replace(4,1, inplace = True)
#Store the two columns values into two seperate arrays (For Machine Learning stage)
text , sentiment = df['text'].values , df['sentiment'].values
最初に、我々はKaggleから得たデータセットを輸入します.CSVファイルとして、我々はデータファイルにデータを格納します、このデータセットの正の値はデフォルトで4です.そして、2つの列(ツイート)と(センチメント)を2つの配列に格納し、後処理フェーズで後で使用する.
ステップ3
感情の分配をプロットする
#Check the number of Positive, Natural and Negative Sentiments
ax = plt.axes()
ax.set_facecolor("#F5F5DC")
sns.countplot(x='sentiment',data=df, palette = "Set1",\
edgecolor='black', linewidth=0.3,alpha=0.7)
plt.title('Sentiment Distribution',fontsize=14)
plt.xlabel('Sentiment',fontsize=10)
plt.show()
感情の分配の仕方を理解するために、次のプロットをプロットした.私たちは、感情の肯定的で負の価値だけがある、そして、彼らが等しいというプロットからわかることができます.
ステップ4
前処理の準備
#Pre-defined emoji list
emojis = {':)': 'smile', ':-)': 'smile', ';d': 'wink', ':-E': 'vampire', ':(': 'sad',
':-(': 'sad', ':-<': 'sad', ':P': 'raspberry', ':O': 'surprised',
':-@': 'shocked', ':@': 'shocked',':-$': 'confused', ':\\': 'annoyed',
':#': 'mute', ':X': 'mute', ':^)': 'smile', ':-&': 'confused', '$_$': 'greedy',
'@@': 'eyeroll', ':-!': 'confused', ':-D': 'smile', ':-0': 'yell', 'O.o': 'confused',
'<(-_-)>': 'robot', 'd[-_-]b': 'dj', ":'-)": 'sadsmile', ';)': 'wink',
';-)': 'wink', 'O:-)': 'angel','O*-)': 'angel','(:-D': 'gossip', '=^.^=': 'cat'}
#Pre-defined Stopword list
stopwordlist = ['a', 'about', 'above', 'after', 'again', 'ain', 'all', 'am', 'an',
'and','any','are', 'as', 'at', 'be', 'because', 'been', 'before',
'being', 'below', 'between','both', 'by', 'can', 'd', 'did', 'do',
'does', 'doing', 'down', 'during', 'each','few', 'for', 'from',
'further', 'had', 'has', 'have', 'having', 'he', 'her', 'here',
'hers', 'herself', 'him', 'himself', 'his', 'how', 'i', 'if', 'in',
'into','is', 'it', 'its', 'itself', 'just', 'll', 'm', 'ma',
'me', 'more', 'most','my', 'myself', 'now', 'o', 'of', 'on', 'once',
'only', 'or', 'other', 'our', 'ours','ourselves', 'out', 'own', 're',
's', 'same', 'she', "shes", 'should', "shouldve",'so', 'some', 'such',
't', 'than', 'that', "thatll", 'the', 'their', 'theirs', 'them',
'themselves', 'then', 'there', 'these', 'they', 'this', 'those',
'through', 'to', 'too','under', 'until', 'up', 've', 'very', 'was',
'we', 'were', 'what', 'when', 'where','which','while', 'who', 'whom',
'why', 'will', 'with', 'won', 'y', 'you', "youd","youll", "youre",
"youve", 'your', 'yours', 'yourself', 'yourselves']
#A lemmatizer job is to convert the word to the base form
Lem = WordNetLemmatizer()
#Patterns to be replaced
urlPattern = r"((http://)[^ ]*|(https://)[^ ]*|( www\.)[^ ]*)"
userPattern = '@[^\s]+'
alphaPattern = "[^a-zA-Z0-9]"
sequencePattern = r"(.)\1\1+"
seqReplacePattern = r"\1\1"
ここでは、定義されたリストを定義します.これらの定義は、EMJISとストップワード、URLパターン、ユーザパターン、および前処理段階での非アルファパターンやシーケンスパターンを置き換えるのに役立ちます.ステップ5
前処理関数
#Data cleaning function
def Data_preprocess(textdata , processedText):
for tweet in textdata:
tweet = tweet.lower()
#Replace all URls with 'URL'
tweet = re.sub(urlPattern,' URL',tweet)
#Replace all emojis
for emoji in emojis.keys():
tweet = tweet.replace(emoji, "EMOJI" + emojis[emoji])
#Replace @USERNAME to 'USER'
tweet = re.sub(userPattern,' USER', tweet)
#Replace all non alphabets
tweet = re.sub(alphaPattern, " ", tweet)
#Replace 3 or more consecutive letters by 2 letter
tweet = re.sub(sequencePattern, seqReplacePattern, tweet)
tweetwords = ''
for word in tweet.split():
#Checking if the word is a stopword
#if word not in stopwordlist:
if len(word)>1:
#Lemmatizing the word
word = Lem.lemmatize(word)
tweetwords += (word+' ')
processedText.append(tweetwords)
return processedText
ここでは、我々は両方のトレーニングデータと我々のストリームされたデータで使用する前処理機能を定義し、この関数は、各つぶやきを取り、低機能を使用して、その後、すべてのURL、emojis、ユーザー名、非アルファベット、および配列の単語を置き換えて、我々は単語のlemmatizerを使用して関数でも、各単語をベースフォームに変換するために使用します.ステップ6
データのトレーニング
#Initiating an array
processedBigData = []
#Cleaning the text array
Data_preprocess(text,processedBigData)
#The beginning of the model test phase starts with splitting the data into train and test,
#It's a big data so 5% portion to test is very fine
X_train, X_test, y_train, y_test = train_test_split(processedBigData, sentiment,
test_size = 0.05, random_state = 0)
#Creating a Vectoriser
#A Vectoriser with range of (1,2) ngrams means we will take the words on two portions
vectoriser = TfidfVectorizer(ngram_range=(1,2), max_features=500000)
#Fitting the Vectoriser for each train tweets and test tweets
vectoriser.fit(X_train)
X_train = vectoriser.transform(X_train)
X_test = vectoriser.transform(X_test)
#Creating Logistic Regression object
lr = LogisticRegression(C = 2, max_iter = 1000, n_jobs=-1)
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
print(classification_report(y_test, y_pred))
前処理関数を、1万6百万ツイートを含むテキスト配列に適用し、空リストに格納します.その後、TrainRound TestConeスプリット関数を呼び出して、データをテストするためのデータの95 %を取るために、私たちはTFIDFVECtorizerを使用して単語をトークン化するには、私たちは2つのチャンクでつぶやきをトークンをトークン化します.
つぶやきをトークン化した後にロジスティック回帰を適用し、テストスコアを評価します.
precision recall f1-score support
0 0.83 0.82 0.83 39989
1 0.82 0.84 0.83 40011
accuracy 0.83 80000
macro avg 0.83 0.83 0.83 80000
weighted avg 0.83 0.83 0.83 80000
ステップ7
データのインポートとクリーニング
#Reading the streamed tweets csv file storing it into a DataFrame
tweets_df = pd.read_csv('E:\DigiSay Project\Tweets.csv', names = ['text'],encoding='latin-1')
#Storing the tweets into an array
tweetText = tweets_df['text'].values
len(tweetText)
1714
我々は、Excelからそれを掃除した後、我々のデータをインポートし、ツイートの配列を格納し、我々は1714行を持っていることを伝えることができます我々は1714のつぶやきを意味します.#Initiating two lists for cleaning purposes
tweetsNOTclean = []
tweetsNOTclean2 = []
#Cleaning the data with our Preprocessing function
Data_preprocess(tweetText,tweetsNOTclean)
#Choosing only tweets contains strings with our chosed topics and storing it into the second list
for text in tweetsNOTclean:
if ('squid game' in text) or ('sae byeok' in text):
tweetsNOTclean2.append(text)
print(len(tweetsNOTclean2))
945
私たちは、私たちのキーワードを含んでいるかどうかチェックするために、それぞれのつぶやきを取る、このケース(イカのゲーム)と(SAE Byeok)では、つぶやきが我々のキーワードのいずれかを含んでいないので、我々はそれを必要としないので、私たちはそれをドロップして、この浄化の後、私たちのつぶやき返信が945になったことがわかりました.#Checking for duplicates
df_preprocessed = pd.DataFrame(tweetsNOTclean2)
df_preprocessed.duplicated().sum()
170
我々は、データを重複してカウントするために我々のリストに変換します.#Dropping duplicates and storing the final result in an 1d list
preprocessedTweets = df_preprocessed.drop_duplicates().values.flatten().tolist()
print(len(preprocessedTweets ), type(preprocessedTweets))
preprocessedTweets
今、私たちは重複をドロップし、データの変換を1次元リストに変換することができます.775 <class 'list'>
['USER liked it more than squid game ',
'USER is squid game must see inquiring mind want know ',
'USER USER just finished the squid game and it actually decent ',
'USER squid game mai ha betrayal to pakistani while playing the game ',
'USER squid game didn really stick the landing ',
'but now squid game put me in drama mood ll have to see if can find something little more uplifting after finish ',
'USER yeah don think in squid game case it wa bad enough to ruin the whole show but it really did disappoint compared to the entire middle of the show ',
'this awesome uncle put together some lego while watching squid game ',
'USER USER USER USER no the way one dy is very important dying honourably is creed everyone sld follow didn he die eventually can save ur life bcoz want to do whatever it take to save it our living right now is squid game our choice decision action all part of game ',
'USER and magic memory an opponent never have existed even if it not reciprocal equally an opponent will never exist mm think still don know lot and sometimes fear paralyze me didn know it amp don know what to dott think would be fine to see squid game ',
我々は今、775ツイートの回答を自分たちの感情を予測するには、どのように光沢がある見て🤩.手順8 :
感情を予測する
#Fitting the train data
vectoriser.fit(processedBigData)
#Transforming each Train tweets and our streamed tweets
X_train = vectoriser.transform(processedBigData)
X_test = vectoriser.transform(preprocessedTweets)
#Creating the Logistic Regression model
lr = LogisticRegression(C = 2, max_iter = 1000, n_jobs=-1)
lr.fit(X_train, sentiment)
#Predicting each sentiment for each tweet
y_pred = lr.predict(X_test)
#Storing the results into a DataFrame
finalResult = pd.DataFrame(list(zip(preprocessedTweets, y_pred)) , columns=['Tweets','Sentiment'])
finalResult['Sentiment'].replace([1,0],['Positive','Negative'],inplace=True)
すべてのクリーニングの後、我々は今、我々のデータをベクトル化、フィットして、変換することができます、我々はここでロジスティック回帰を使用します.最終結果
finalResult
Tweets Sentiment
0 USER liked it more than squid game Positive
1 USER is squid game must see inquiring mind wan... Positive
2 USER USER just finished the squid game and it ... Positive
3 USER squid game mai ha betrayal to pakistani w... Negative
4 USER squid game didn really stick the landing Negative
... ... ...
770 USER still mad and for you info ratmy squid ga... Positive
771 maybe it wa not the squid game saw that she wa... Negative
772 sang woo and sae byeok plan together Positive
773 sae byeok and sang woo are similar character i... Positive
774 USER USER USER USER to me squid game basically... Positive
This is our final DataFrame with each tweet and it's sentiment.
#Displaying the distribution of the positive and negative sentiments
ax = plt.axes()
ax.set_facecolor("#F5F5DC")
sns.countplot(x='Sentiment',data=finalResult, palette = "Set1",\
edgecolor='black', linewidth=0.3,alpha=0.7)
plt.title('Sentiment Distribution',fontsize=14)
plt.xlabel('Sentiment',fontsize=10)
plt.show()
そして、ここでは、感情の分布を示す最後のプロットは、私は肯定的な感情は、私たちのつぶやきのトピックは、偉大なシリーズにあるが、もちろんシリーズは、多くの死、血と激しいシーンを持っているので、これは私たちのつぶやきの返信内容の一部かもしれないので、上昇し、感情の結果に反映される期待していた.
このブログを読んでくれてありがとう.
アーメドアブドゥワワブ
Reference
この問題について(アパッチスパークによる感情分析), 我々は、より多くの情報をここで見つけました https://dev.to/ahmedabdulwahab/adad-5aioテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol