Source code for feast.loaders.file

# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import tempfile
import uuid
from datetime import datetime
from typing import List, Optional, Tuple, Union
from urllib.parse import urlparse

import pandas as pd
from pandavro import to_avro

from feast.staging.storage_client import get_staging_client


[docs]def export_source_to_staging_location( source: Union[pd.DataFrame, str], staging_location_uri: str ) -> List[str]: """ Uploads a DataFrame as an Avro file to a remote staging location. The local staging location specified in this function is used for E2E tests, please do not use it. Args: source (Union[pd.DataFrame, str]: Source of data to be staged. Can be a pandas DataFrame or a file path. Only four types of source are allowed: * Pandas DataFrame * Local Avro file * GCS Avro file * S3 Avro file * Azure Blob storage Avro file staging_location_uri (str): Remote staging location where DataFrame should be written. Examples: * gs://bucket/path/ * s3://bucket/path/ * wasbs://bucket@account_name.blob.core.windows.net/path/ * file:///data/subfolder/ Returns: List[str]: Returns a list containing the full path to the file(s) in the remote staging location. """ uri = urlparse(staging_location_uri) # Prepare Avro file to be exported to staging location if isinstance(source, pd.DataFrame): # Remote gs staging location provided by serving dir_path, file_name, source_path = export_dataframe_to_local(df=source) elif isinstance(source, str): source_uri = urlparse(source) if source_uri.scheme in ["", "file"]: # Local file provided as a source dir_path = "" file_name = os.path.basename(source) source_path = os.path.abspath( os.path.join(source_uri.netloc, source_uri.path) ) else: # gs, s3, azure blob file provided as a source. assert source_uri.hostname is not None return get_staging_client(source_uri.scheme).list_files(uri=source_uri) else: raise Exception( f"Only string and DataFrame types are allowed as a " f"source, {type(source)} was provided." ) # Push data to required staging location with open(source_path, "rb") as f: get_staging_client(uri.scheme).upload_fileobj( f, source_path, remote_uri=uri._replace(path=str(uri.path).strip("/") + "/" + file_name), ) # Clean up, remove local staging file if dir_path and isinstance(source, pd.DataFrame) and len(dir_path) > 4: shutil.rmtree(dir_path) return [staging_location_uri.rstrip("/") + "/" + file_name]
[docs]def export_dataframe_to_local( df: pd.DataFrame, dir_path: Optional[str] = None ) -> Tuple[str, str, str]: """ Exports a pandas DataFrame to the local filesystem. Args: df (pd.DataFrame): Pandas DataFrame to save. dir_path (Optional[str]): Absolute directory path '/data/project/subfolder/'. Returns: Tuple[str, str, str]: Tuple of directory path, file name and destination path. The destination path can be obtained by concatenating the directory path and file name. """ # Create local staging location if not provided if dir_path is None: dir_path = tempfile.mkdtemp() file_name = _get_file_name() dest_path = f"{dir_path}/{file_name}" # Temporarily rename datetime column to event_timestamp. Ideally we would # force the schema with our avro writer instead. df.columns = ["event_timestamp" if col == "datetime" else col for col in df.columns] try: # Export dataset to file in local path to_avro(df=df, file_path_or_buffer=dest_path) except Exception: raise finally: # Revert event_timestamp column to datetime df.columns = [ "datetime" if col == "event_timestamp" else col for col in df.columns ] return dir_path, file_name, dest_path
def _get_file_name() -> str: """ Create a random file name. Returns: str: Randomised file name. """ return f'{datetime.now().strftime("%d-%m-%Y_%I-%M-%S_%p")}_{str(uuid.uuid4())[:8]}.avro'