Feast Log #3
26300 ワード
Feast Serverの作成 jupyter가 아닌 우분투 cli환경에서 생성한다..
server늬까...ㅠ
以前はDockerコンテナ環境でJpyter Notebookで実行していました.
これで、ローカル環境でFeast Serverを作成して実行します.
1.Feast Serverをローカルで実行
Local feature server
-サーバーの実行
pip install feast
feast init feature_repo
cd feature_repo
feast apply
feast materialize-incremental $(date +%Y-%m-%d)
feast serve
-curlを使用してQueryを実行
새 터미널을 열어야함. 기존 터미널은 서버가 돌아가고 있으니까.
# example curl
curl -X POST \
"http://localhost:6566/get-online-features" \
-d '{
"features": [
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips"
],
"entities": {
"driver_id": [1001, 1002, 1003]
}
}'
2.Docker Containerを使用してFeast Serverを実行
MLOpsは、クベネディスとドッキングステーションイメージを使用します.コンテナ環境で戻ります.
Feastは自分の容器環境を提供していません...
見せてください.
- Build the docker image
requirement.txtの作成
必要なディレクトリから要求します.txtの作成~/mlops/dokcer/requirement.txt
# requirement.txt
feast
scikit-learn
mlflow
pandas
Dockerfileの作成
# syntax=docker/dockerfile:1
FROM jupyter/base-notebook
WORKDIR /home/jovyan
COPY . /home/jovyan
RUN pip3 install -r requirements.txt
USER jovyan
RUN feast init feature_repo && \
cd feature_repo && \
feast apply && \
feast materialize-incremental $(date +%Y-%m-%d)
COPY feature_server.py /opt/conda/lib/python3.9/site-packages/feast/feature_server.py
CMD [ "/bin/sh", "-c", "cd /home/jovyan/feature_repo && feast serve"]
WORKDIR /home/jovyan
feature_server.py
fastAPI serverを作成してfeatureにサービスimport json
import traceback
import warnings
import click
import uvicorn
import pandas as pd
from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel
import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.type_map import feast_value_type_to_python_type
# TODO: deprecate this in favor of push features
class WriteToFeatureStoreRequest(BaseModel):
feature_view_name: str
df: dict
allow_registry_cache: bool = True
class PushFeaturesRequest(BaseModel):
push_source_name: str
df: dict
allow_registry_cache: bool = True
def get_app(store: "feast.FeatureStore"):
proto_json.patch()
app = FastAPI()
async def get_body(request: Request):
return await request.body()
@app.post("/get-online-features")
def get_online_features(body=Depends(get_body)):
try:
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
request_proto = GetOnlineFeaturesRequest()
Parse(body, request_proto)
# Initialize parameters for FeatureStore.get_online_features(...) call
if request_proto.HasField("feature_service"):
features = store.get_feature_service(
request_proto.feature_service, allow_cache=True
)
else:
features = list(request_proto.features.val)
full_feature_names = request_proto.full_feature_names
batch_sizes = [len(v.val) for v in request_proto.entities.values()]
num_entities = batch_sizes[0]
if any(batch_size != num_entities for batch_size in batch_sizes):
raise HTTPException(status_code=500, detail="Uneven number of columns")
entity_rows = [
{
k: feast_value_type_to_python_type(v.val[idx])
for k, v in request_proto.entities.items()
}
for idx in range(num_entities)
]
response_proto = store._get_online_features(
features=features,
entity_rows=entity_rows,
entity_values=request_proto.entities,
full_feature_names=full_feature_names,
native_entity_values=False,
).proto
# Convert the Protobuf object to JSON and return it
return MessageToDict( # type: ignore
response_proto, preserving_proto_field_name=True, float_precision=18
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
@app.post("/push")
def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
@app.post("/write-to-online-store")
def write_to_online_store(body=Depends(get_body)):
warnings.warn(
"write_to_online_store is an experimental feature. "
"This API is unstable and it could be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
store.write_to_online_store(
feature_view_name=request.feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
return app
def start_server(store: "feast.FeatureStore", host: str, port: int, no_access_log: bool):
app = get_app(store)
click.echo(
"This is an "
+ click.style("experimental", fg="yellow", bold=True, underline=True)
+ " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
)
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))
docker build
# 도커 파일에서 지정한 server.py 실행
# 태그지정 | 현재 디렉토리 파일들을 빌드
docker build --tag feast-docker .
Run the feast docker container
docker run -d --name feast-jupyter -p 8888:8888 -p 6566:6566 -p 5001:5001 -e JUPYTER_TOKEN='password' \
-v "$PWD":/home/jovyan/jupyter \
--user root \
-it feast-docker:latest
前に使用したcurl文を使用
juptyer lab追加の実行docker exec -it feast-jupyter start.sh jupyter lab &
jupyter-lab
3. Feast Feature Store
FeatureServiceの追加
feature-repo/example.pyに次のコードを追加from feast import FeatureService
driver_fs = FeatureService(name="driver_ranking_fv_svc",
features=[driver_hourly_stats_view],
tags={"description": "Used for training an ElasticNet model"})
宴会を再申請します.
jupyter가 아닌 우분투 cli환경에서 생성한다..
server늬까...ㅠ
Local feature server
-サーバーの実行
pip install feast
feast init feature_repo
cd feature_repo
feast apply
feast materialize-incremental $(date +%Y-%m-%d)
feast serve
-curlを使用してQueryを実行
새 터미널을 열어야함. 기존 터미널은 서버가 돌아가고 있으니까.
# example curl
curl -X POST \
"http://localhost:6566/get-online-features" \
-d '{
"features": [
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips"
],
"entities": {
"driver_id": [1001, 1002, 1003]
}
}'
2.Docker Containerを使用してFeast Serverを実行
MLOpsは、クベネディスとドッキングステーションイメージを使用します.コンテナ環境で戻ります.
Feastは自分の容器環境を提供していません...
見せてください.
- Build the docker image
requirement.txtの作成
必要なディレクトリから要求します.txtの作成~/mlops/dokcer/requirement.txt
# requirement.txt
feast
scikit-learn
mlflow
pandas
Dockerfileの作成
# syntax=docker/dockerfile:1
FROM jupyter/base-notebook
WORKDIR /home/jovyan
COPY . /home/jovyan
RUN pip3 install -r requirements.txt
USER jovyan
RUN feast init feature_repo && \
cd feature_repo && \
feast apply && \
feast materialize-incremental $(date +%Y-%m-%d)
COPY feature_server.py /opt/conda/lib/python3.9/site-packages/feast/feature_server.py
CMD [ "/bin/sh", "-c", "cd /home/jovyan/feature_repo && feast serve"]
WORKDIR /home/jovyan
feature_server.py
fastAPI serverを作成してfeatureにサービスimport json
import traceback
import warnings
import click
import uvicorn
import pandas as pd
from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel
import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.type_map import feast_value_type_to_python_type
# TODO: deprecate this in favor of push features
class WriteToFeatureStoreRequest(BaseModel):
feature_view_name: str
df: dict
allow_registry_cache: bool = True
class PushFeaturesRequest(BaseModel):
push_source_name: str
df: dict
allow_registry_cache: bool = True
def get_app(store: "feast.FeatureStore"):
proto_json.patch()
app = FastAPI()
async def get_body(request: Request):
return await request.body()
@app.post("/get-online-features")
def get_online_features(body=Depends(get_body)):
try:
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
request_proto = GetOnlineFeaturesRequest()
Parse(body, request_proto)
# Initialize parameters for FeatureStore.get_online_features(...) call
if request_proto.HasField("feature_service"):
features = store.get_feature_service(
request_proto.feature_service, allow_cache=True
)
else:
features = list(request_proto.features.val)
full_feature_names = request_proto.full_feature_names
batch_sizes = [len(v.val) for v in request_proto.entities.values()]
num_entities = batch_sizes[0]
if any(batch_size != num_entities for batch_size in batch_sizes):
raise HTTPException(status_code=500, detail="Uneven number of columns")
entity_rows = [
{
k: feast_value_type_to_python_type(v.val[idx])
for k, v in request_proto.entities.items()
}
for idx in range(num_entities)
]
response_proto = store._get_online_features(
features=features,
entity_rows=entity_rows,
entity_values=request_proto.entities,
full_feature_names=full_feature_names,
native_entity_values=False,
).proto
# Convert the Protobuf object to JSON and return it
return MessageToDict( # type: ignore
response_proto, preserving_proto_field_name=True, float_precision=18
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
@app.post("/push")
def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
@app.post("/write-to-online-store")
def write_to_online_store(body=Depends(get_body)):
warnings.warn(
"write_to_online_store is an experimental feature. "
"This API is unstable and it could be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
store.write_to_online_store(
feature_view_name=request.feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
return app
def start_server(store: "feast.FeatureStore", host: str, port: int, no_access_log: bool):
app = get_app(store)
click.echo(
"This is an "
+ click.style("experimental", fg="yellow", bold=True, underline=True)
+ " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
)
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))
docker build
# 도커 파일에서 지정한 server.py 실행
# 태그지정 | 현재 디렉토리 파일들을 빌드
docker build --tag feast-docker .
Run the feast docker container
docker run -d --name feast-jupyter -p 8888:8888 -p 6566:6566 -p 5001:5001 -e JUPYTER_TOKEN='password' \
-v "$PWD":/home/jovyan/jupyter \
--user root \
-it feast-docker:latest
前に使用したcurl文を使用
juptyer lab追加の実行docker exec -it feast-jupyter start.sh jupyter lab &
jupyter-lab
3. Feast Feature Store
FeatureServiceの追加
feature-repo/example.pyに次のコードを追加from feast import FeatureService
driver_fs = FeatureService(name="driver_ranking_fv_svc",
features=[driver_hourly_stats_view],
tags={"description": "Used for training an ElasticNet model"})
宴会を再申請します.
~/mlops/dokcer/requirement.txt
# requirement.txt
feast
scikit-learn
mlflow
pandas
# syntax=docker/dockerfile:1
FROM jupyter/base-notebook
WORKDIR /home/jovyan
COPY . /home/jovyan
RUN pip3 install -r requirements.txt
USER jovyan
RUN feast init feature_repo && \
cd feature_repo && \
feast apply && \
feast materialize-incremental $(date +%Y-%m-%d)
COPY feature_server.py /opt/conda/lib/python3.9/site-packages/feast/feature_server.py
CMD [ "/bin/sh", "-c", "cd /home/jovyan/feature_repo && feast serve"]
WORKDIR /home/jovyan
import json
import traceback
import warnings
import click
import uvicorn
import pandas as pd
from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel
import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
from feast.type_map import feast_value_type_to_python_type
# TODO: deprecate this in favor of push features
class WriteToFeatureStoreRequest(BaseModel):
feature_view_name: str
df: dict
allow_registry_cache: bool = True
class PushFeaturesRequest(BaseModel):
push_source_name: str
df: dict
allow_registry_cache: bool = True
def get_app(store: "feast.FeatureStore"):
proto_json.patch()
app = FastAPI()
async def get_body(request: Request):
return await request.body()
@app.post("/get-online-features")
def get_online_features(body=Depends(get_body)):
try:
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
request_proto = GetOnlineFeaturesRequest()
Parse(body, request_proto)
# Initialize parameters for FeatureStore.get_online_features(...) call
if request_proto.HasField("feature_service"):
features = store.get_feature_service(
request_proto.feature_service, allow_cache=True
)
else:
features = list(request_proto.features.val)
full_feature_names = request_proto.full_feature_names
batch_sizes = [len(v.val) for v in request_proto.entities.values()]
num_entities = batch_sizes[0]
if any(batch_size != num_entities for batch_size in batch_sizes):
raise HTTPException(status_code=500, detail="Uneven number of columns")
entity_rows = [
{
k: feast_value_type_to_python_type(v.val[idx])
for k, v in request_proto.entities.items()
}
for idx in range(num_entities)
]
response_proto = store._get_online_features(
features=features,
entity_rows=entity_rows,
entity_values=request_proto.entities,
full_feature_names=full_feature_names,
native_entity_values=False,
).proto
# Convert the Protobuf object to JSON and return it
return MessageToDict( # type: ignore
response_proto, preserving_proto_field_name=True, float_precision=18
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
@app.post("/push")
def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
@app.post("/write-to-online-store")
def write_to_online_store(body=Depends(get_body)):
warnings.warn(
"write_to_online_store is an experimental feature. "
"This API is unstable and it could be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
store.write_to_online_store(
feature_view_name=request.feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
return app
def start_server(store: "feast.FeatureStore", host: str, port: int, no_access_log: bool):
app = get_app(store)
click.echo(
"This is an "
+ click.style("experimental", fg="yellow", bold=True, underline=True)
+ " feature. It's intended for early testing and feedback, and could change without warnings in future releases."
)
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))
# 도커 파일에서 지정한 server.py 실행
# 태그지정 | 현재 디렉토리 파일들을 빌드
docker build --tag feast-docker .
docker run -d --name feast-jupyter -p 8888:8888 -p 6566:6566 -p 5001:5001 -e JUPYTER_TOKEN='password' \
-v "$PWD":/home/jovyan/jupyter \
--user root \
-it feast-docker:latest
docker exec -it feast-jupyter start.sh jupyter lab &
FeatureServiceの追加
feature-repo/example.pyに次のコードを追加
from feast import FeatureService
driver_fs = FeatureService(name="driver_ranking_fv_svc",
features=[driver_hourly_stats_view],
tags={"description": "Used for training an ElasticNet model"})
宴会を再申請します.feast --help
feast feature-views list
feast feature-services list
feast feature-services describe <feature_service_name>
feast entities list
feast teardown ## 전부 삭제되므로 주의
Reference
この問題について(Feast Log #3), 我々は、より多くの情報をここで見つけました https://velog.io/@zoody/Feast-Log-3テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol