Feast Log #3

26300 ワード

Feast Serverの作成

jupyter가 아닌 우분투 cli환경에서 생성한다..
server늬까...ㅠ
以前はDockerコンテナ環境でJpyter Notebookで実行していました.
これで、ローカル環境でFeast Serverを作成して実行します.

1.Feast Serverをローカルで実行


Local feature server

-サーバーの実行

pip install feast
feast init feature_repo
cd feature_repo
feast apply

feast materialize-incremental $(date +%Y-%m-%d)
feast serve

-curlを使用してQueryを実行

새 터미널을 열어야함. 기존 터미널은 서버가 돌아가고 있으니까.
# example curl
curl -X POST \
  "http://localhost:6566/get-online-features" \
  -d '{
    "features": [
      "driver_hourly_stats:conv_rate",
      "driver_hourly_stats:acc_rate",
      "driver_hourly_stats:avg_daily_trips"
    ],
    "entities": {
      "driver_id": [1001, 1002, 1003]
    }
  }'

2.Docker Containerを使用してFeast Serverを実行


MLOpsは、クベネディスとドッキングステーションイメージを使用します.コンテナ環境で戻ります.
Feastは自分の容器環境を提供していません...
見せてください.

- Build the docker image


requirement.txtの作成


必要なディレクトリから要求します.txtの作成
~/mlops/dokcer/requirement.txt
# requirement.txt
feast
scikit-learn
mlflow
pandas

Dockerfileの作成

# syntax=docker/dockerfile:1
FROM jupyter/base-notebook
WORKDIR /home/jovyan
COPY . /home/jovyan

RUN pip3 install -r requirements.txt

USER jovyan
RUN feast init feature_repo && \
		cd feature_repo && \
		feast apply && \
		feast materialize-incremental $(date +%Y-%m-%d) 

COPY feature_server.py /opt/conda/lib/python3.9/site-packages/feast/feature_server.py
CMD [ "/bin/sh", "-c", "cd /home/jovyan/feature_repo && feast serve"]

WORKDIR /home/jovyan

feature_server.py


fastAPI serverを作成してfeatureにサービス
import json
import traceback
import warnings

import click
import uvicorn
import pandas as pd

from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel

import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.type_map import feast_value_type_to_python_type


# TODO: deprecate this in favor of push features
class WriteToFeatureStoreRequest(BaseModel):
    feature_view_name: str
    df: dict
    allow_registry_cache: bool = True


class PushFeaturesRequest(BaseModel):
    push_source_name: str
    df: dict
    allow_registry_cache: bool = True


def get_app(store: "feast.FeatureStore"):
    proto_json.patch()

    app = FastAPI()

    async def get_body(request: Request):
        return await request.body()

    @app.post("/get-online-features")
    def get_online_features(body=Depends(get_body)):
        try:
            # Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
            request_proto = GetOnlineFeaturesRequest()
            Parse(body, request_proto)

            # Initialize parameters for FeatureStore.get_online_features(...) call
            if request_proto.HasField("feature_service"):
                features = store.get_feature_service(
                    request_proto.feature_service, allow_cache=True
                )
            else:
                features = list(request_proto.features.val)

            full_feature_names = request_proto.full_feature_names

            batch_sizes = [len(v.val) for v in request_proto.entities.values()]
            num_entities = batch_sizes[0]
            if any(batch_size != num_entities for batch_size in batch_sizes):
                raise HTTPException(status_code=500, detail="Uneven number of columns")

            entity_rows = [
                {
                    k: feast_value_type_to_python_type(v.val[idx])
                    for k, v in request_proto.entities.items()
                }
                for idx in range(num_entities)
            ]
            
            response_proto = store._get_online_features(
                features=features,
                entity_rows=entity_rows,
                entity_values=request_proto.entities,
                full_feature_names=full_feature_names,
                native_entity_values=False,
            ).proto

            # Convert the Protobuf object to JSON and return it
            return MessageToDict(  # type: ignore
                response_proto, preserving_proto_field_name=True, float_precision=18
            )
        except Exception as e:
            # Print the original exception on the server side
            logger.exception(traceback.format_exc())
            # Raise HTTPException to return the error message to the client
            raise HTTPException(status_code=500, detail=str(e))

    @app.post("/push")
    def push(body=Depends(get_body)):
        try:
            request = PushFeaturesRequest(**json.loads(body))
            df = pd.DataFrame(request.df)
            store.push(
                push_source_name=request.push_source_name,
                df=df,
                allow_registry_cache=request.allow_registry_cache,
            )
        except Exception as e:
            # Print the original exception on the server side
            logger.exception(traceback.format_exc())
            # Raise HTTPException to return the error message to the client
            raise HTTPException(status_code=500, detail=str(e))

    @app.post("/write-to-online-store")
    def write_to_online_store(body=Depends(get_body)):
        warnings.warn(
            "write_to_online_store is an experimental feature. "
            "This API is unstable and it could be changed in the future. "
            "We do not guarantee that future changes will maintain backward compatibility.",
            RuntimeWarning,
        )
        try:
            request = WriteToFeatureStoreRequest(**json.loads(body))
            df = pd.DataFrame(request.df)
            store.write_to_online_store(
                feature_view_name=request.feature_view_name,
                df=df,
                allow_registry_cache=request.allow_registry_cache,
            )
        except Exception as e:
            # Print the original exception on the server side
            logger.exception(traceback.format_exc())
            # Raise HTTPException to return the error message to the client
            raise HTTPException(status_code=500, detail=str(e))

    return app


def start_server(store: "feast.FeatureStore", host: str, port: int, no_access_log: bool):
    app = get_app(store)
    click.echo(
        "This is an "
        + click.style("experimental", fg="yellow", bold=True, underline=True)
        + " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
    )
    uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))

docker build

# 도커 파일에서 지정한 server.py 실행
#             태그지정         | 현재 디렉토리 파일들을 빌드
docker build --tag feast-docker .

Run the feast docker container

docker run -d --name feast-jupyter -p 8888:8888 -p 6566:6566 -p 5001:5001 -e JUPYTER_TOKEN='password' \
-v "$PWD":/home/jovyan/jupyter \
--user root \
-it feast-docker:latest
前に使用したcurl文を使用
juptyer lab追加の実行
docker exec -it feast-jupyter start.sh jupyter lab &
jupyter-lab

3. Feast Feature Store


FeatureServiceの追加


feature-repo/example.pyに次のコードを追加
from feast import FeatureService
driver_fs = FeatureService(name="driver_ranking_fv_svc",
                           features=[driver_hourly_stats_view],
                           tags={"description": "Used for training an ElasticNet model"})
宴会を再申請します.
  • Feastメインコマンド
  • feast --help
    feast feature-views list
    feast feature-services list
    feast feature-services describe <feature_service_name>
    feast entities list
    
    feast teardown ## 전부 삭제되므로 주의