リアルタイムのカメラ映像+Google Cloud Vision その1:FACE_DETECTION


これなに?

カメラ映像から顔を探し出す、よくあるプログラム。
「リアルタイムのカメラ映像+Google Cloud Vision(+OpenCV)」のサンプルコードが見つからなかったので、投稿。

  • 顔を探し出すエンジンはGoogle Cloud Vision API(FACE_DETECTION)を利用。SDKではなくREST版。
  • カメラ映像の取り込みや画面表示なんかの足回りは懐かしのOpenCVを利用。
  • OpenCVの顔検出エンジンは利用せず。
  • FACE_DETECTIONの部分を別の処理(LOGO_DETECTIONとかLABEL_DETECTIONとか)に変えても遊べるよ。
  • とりあえず動けばOkのサンプルだから、マルチスレッドの処理とかいい加減だよ。

環境

  • Windows 7
  • OpenCV 3.4.1
  • Python 2.7.15
    • Google Cloud Vision SDKが3系非対応なので、仕方なく2.7を利用中。(今回はSDK使わないけど)

参考にした資料

コード

sample.py
#! /usr/bin/python
# -*- coding: utf-8 -*-!

# カメラ映像から顔を探し出すアプリ Google Cloud Vision API利用版

import sys
import base64
import cv2
from requests import Request, Session
import json
import time
import threading

# GCPのAPIキー
api_key = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'

# 検出する顔の数の最大数 (増やすとレスポンスが遅くなる)
max_results = 1

# DISCOVERY_URL
DISCOVERY_URL = 'https://vision.googleapis.com/v1/images:annotate?key='

# cv画像と画像ファイルへのPathと検出最大数が引数
def googleAPI(img, max_results):
    # 通信不良等を考慮してTry/expect
    try:
        # カメラ画像をJPG画像へ変換
        retval, image = cv2.imencode('.jpg', img)

        # 顔を検出するやつのResponse作成
        str_headers = {'Content-Type': 'application/json'}
        batch_request = {'requests': [{'image': {'content': base64.b64encode(image)}, 'features': [{'type': 'FACE_DETECTION', 'maxResults': max_results, }]}]}

        # セッションを作ってリクエストSend
        obj_session = Session()
        obj_request = Request("POST", DISCOVERY_URL + api_key, data=json.dumps(batch_request), headers=str_headers)
        obj_prepped = obj_session.prepare_request(obj_request)
        obj_response = obj_session.send(obj_prepped, verify=True, timeout=180)

        # jsonを抽出
        response_json = json.loads(obj_response.text)

        # return
        return response_json

    except:
        return img, ""

class googleApiThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.return_value = None   # RETURN VALUE
        self.frame = None
        self.flagStop = True # TRUE = LOOPし続ける
        self.processTime = 0.0

    def run(self):
        # ちゃんとしたカメラ画像が格納されるまで少しWait
        # カメラ画像が格納されないうちにgoogleAPIを呼び出すと、cv2.imencodeがExceptionとなる。。
        time.sleep(1)

        while(self.flagStop):

            # 処理時間を測定
            timeStart = time.time()

            # Google Cloud Vision APIの呼び出し
            self.return_value = googleAPI(self.frame, max_results)

            timeEnd = time.time()
            self.processTime = timeEnd - timeStart

    def set_frame(self, frame):
        self.frame = frame

    def set_stopFlag(self):
        self.flagStop = False

    def get_value(self):
        return self.return_value

    def get_processTime(self):
        return self.processTime

if __name__ == '__main__':

    # カメラ映像の取り込みスタート
    cap = cv2.VideoCapture(0)

    # 別Threadの起動
    threadGoogleApi = googleApiThread()
    threadGoogleApi.start()

    while(True):
        ret, frame = cap.read()

        # 別スレッドの認識処理の画像を更新
        threadGoogleApi.set_frame(frame)

        # 別スレッドの最新の処理結果を受け取る(1秒に1回ぐらいしか更新されないけど)
        response_json = threadGoogleApi.get_value()
        processTime = threadGoogleApi.get_processTime()

        # 'faceAnnotations'があれば顔あり
        if response_json is not None:
            if 'faceAnnotations' in response_json['responses'][0]:
                faces = response_json['responses'][0]['faceAnnotations']

                for face in faces:
                    # 0と2が両端の番地
                    x = face['fdBoundingPoly']['vertices'][0]['x']
                    y = face['fdBoundingPoly']['vertices'][0]['y']
                    x2 = face['fdBoundingPoly']['vertices'][2]['x']
                    y2 = face['fdBoundingPoly']['vertices'][2]['y']
                    cv2.rectangle(frame, (x, y), (x2, y2), (0, 0, 255), thickness=5)

        # 処理速度を表示(画像に書き込み)
        cv2.putText(frame, str('%03.1f' % processTime) + " sec", (0, 30), cv2.FONT_HERSHEY_PLAIN, 2, (255, 255, 255), 2, cv2.LINE_AA)

        cv2.imshow("camera image", frame)
        if cv2.waitKey(1) == 27:    # ESCキーで終了
            break

    # 終了処理
    threadGoogleApi.set_stopFlag()
    cap.release()

実行結果

顔の部分を赤枠で囲んでます。
左上の数字はGoogle Cloud Visionの処理時間。1回に1~2秒かかっていて、リアルタイム性は低い。
バッチ用かな?