Source code for feast.data_source

# Copyright 2020 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import enum
from typing import Dict, Optional

from feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.data_format import FileFormat, StreamFormat


[docs]class SourceType(enum.Enum): """ DataSource value type. Used to define source types in DataSource. """ UNKNOWN = 0 BATCH_FILE = 1 BATCH_BIGQUERY = 2 STREAM_KAFKA = 3 STREAM_KINESIS = 4
[docs]class FileOptions: """ DataSource File options used to source features from a file """ def __init__( self, file_format: FileFormat, file_url: str, ): self._file_format = file_format self._file_url = file_url @property def file_format(self): """ Returns the file format of this file """ return self._file_format @file_format.setter def file_format(self, file_format): """ Sets the file format of this file """ self._file_format = file_format @property def file_url(self): """ Returns the file url of this file """ return self._file_url @file_url.setter def file_url(self, file_url): """ Sets the file url of this file """ self._file_url = file_url
[docs] @classmethod def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ Creates a FileOptions from a protobuf representation of a file option args: file_options_proto: a protobuf representation of a datasource Returns: Returns a FileOptions object based on the file_options protobuf """ file_options = cls( file_format=FileFormat.from_proto(file_options_proto.file_format), file_url=file_options_proto.file_url, ) return file_options
[docs] def to_proto(self) -> DataSourceProto.FileOptions: """ Converts an FileOptionsProto object to its protobuf representation. Returns: FileOptionsProto protobuf """ file_options_proto = DataSourceProto.FileOptions( file_format=self.file_format.to_proto(), file_url=self.file_url, ) return file_options_proto
[docs]class BigQueryOptions: """ DataSource BigQuery options used to source features from BigQuery query """ def __init__( self, table_ref: str, ): self._table_ref = table_ref @property def table_ref(self): """ Returns the table ref of this BQ table """ return self._table_ref @table_ref.setter def table_ref(self, table_ref): """ Sets the table ref of this BQ table """ self._table_ref = table_ref
[docs] @classmethod def from_proto(cls, bigquery_options_proto: DataSourceProto.BigQueryOptions): """ Creates a BigQueryOptions from a protobuf representation of a BigQuery option Args: bigquery_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the bigquery_options protobuf """ bigquery_options = cls(table_ref=bigquery_options_proto.table_ref,) return bigquery_options
[docs] def to_proto(self) -> DataSourceProto.BigQueryOptions: """ Converts an BigQueryOptionsProto object to its protobuf representation. Returns: BigQueryOptionsProto protobuf """ bigquery_options_proto = DataSourceProto.BigQueryOptions( table_ref=self.table_ref, ) return bigquery_options_proto
[docs]class KafkaOptions: """ DataSource Kafka options used to source features from Kafka messages """ def __init__( self, bootstrap_servers: str, message_format: StreamFormat, topic: str, ): self._bootstrap_servers = bootstrap_servers self._message_format = message_format self._topic = topic @property def bootstrap_servers(self): """ Returns a comma-separated list of Kafka bootstrap servers """ return self._bootstrap_servers @bootstrap_servers.setter def bootstrap_servers(self, bootstrap_servers): """ Sets a comma-separated list of Kafka bootstrap servers """ self._bootstrap_servers = bootstrap_servers @property def message_format(self): """ Returns the data format that is used to encode the feature data in Kafka messages """ return self._message_format @message_format.setter def message_format(self, message_format): """ Sets the data format that is used to encode the feature data in Kafka messages """ self._message_format = message_format @property def topic(self): """ Returns the Kafka topic to collect feature data from """ return self._topic @topic.setter def topic(self, topic): """ Sets the Kafka topic to collect feature data from """ self._topic = topic
[docs] @classmethod def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): """ Creates a KafkaOptions from a protobuf representation of a kafka option Args: kafka_options_proto: A protobuf representation of a DataSource Returns: Returns a BigQueryOptions object based on the kafka_options protobuf """ kafka_options = cls( bootstrap_servers=kafka_options_proto.bootstrap_servers, message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, ) return kafka_options
[docs] def to_proto(self) -> DataSourceProto.KafkaOptions: """ Converts an KafkaOptionsProto object to its protobuf representation. Returns: KafkaOptionsProto protobuf """ kafka_options_proto = DataSourceProto.KafkaOptions( bootstrap_servers=self.bootstrap_servers, message_format=self.message_format.to_proto(), topic=self.topic, ) return kafka_options_proto
[docs]class KinesisOptions: """ DataSource Kinesis options used to source features from Kinesis records """ def __init__( self, record_format: StreamFormat, region: str, stream_name: str, ): self._record_format = record_format self._region = region self._stream_name = stream_name @property def record_format(self): """ Returns the data format used to encode the feature data in the Kinesis records. """ return self._record_format @record_format.setter def record_format(self, record_format): """ Sets the data format used to encode the feature data in the Kinesis records. """ self._record_format = record_format @property def region(self): """ Returns the AWS region of Kinesis stream """ return self._region @region.setter def region(self, region): """ Sets the AWS region of Kinesis stream """ self._region = region @property def stream_name(self): """ Returns the Kinesis stream name to obtain feature data from """ return self._stream_name @stream_name.setter def stream_name(self, stream_name): """ Sets the Kinesis stream name to obtain feature data from """ self._stream_name = stream_name
[docs] @classmethod def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions): """ Creates a KinesisOptions from a protobuf representation of a kinesis option Args: kinesis_options_proto: A protobuf representation of a DataSource Returns: Returns a KinesisOptions object based on the kinesis_options protobuf """ kinesis_options = cls( record_format=StreamFormat.from_proto(kinesis_options_proto.record_format), region=kinesis_options_proto.region, stream_name=kinesis_options_proto.stream_name, ) return kinesis_options
[docs] def to_proto(self) -> DataSourceProto.KinesisOptions: """ Converts an KinesisOptionsProto object to its protobuf representation. Returns: KinesisOptionsProto protobuf """ kinesis_options_proto = DataSourceProto.KinesisOptions( record_format=self.record_format.to_proto(), region=self.region, stream_name=self.stream_name, ) return kinesis_options_proto
[docs]class DataSource: """ DataSource that can be used source features """ def __init__( self, event_timestamp_column: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): self._event_timestamp_column = event_timestamp_column self._created_timestamp_column = created_timestamp_column self._field_mapping = field_mapping if field_mapping else {} self._date_partition_column = date_partition_column def __eq__(self, other): if not isinstance(other, DataSource): raise TypeError("Comparisons should only involve DataSource class objects.") if ( self.event_timestamp_column != other.event_timestamp_column or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column ): return False return True @property def field_mapping(self): """ Returns the field mapping of this data source """ return self._field_mapping @field_mapping.setter def field_mapping(self, field_mapping): """ Sets the field mapping of this data source """ self._field_mapping = field_mapping @property def event_timestamp_column(self): """ Returns the event timestamp column of this data source """ return self._event_timestamp_column @event_timestamp_column.setter def event_timestamp_column(self, event_timestamp_column): """ Sets the event timestamp column of this data source """ self._event_timestamp_column = event_timestamp_column @property def created_timestamp_column(self): """ Returns the created timestamp column of this data source """ return self._created_timestamp_column @created_timestamp_column.setter def created_timestamp_column(self, created_timestamp_column): """ Sets the created timestamp column of this data source """ self._created_timestamp_column = created_timestamp_column @property def date_partition_column(self): """ Returns the date partition column of this data source """ return self._date_partition_column @date_partition_column.setter def date_partition_column(self, date_partition_column): """ Sets the date partition column of this data source """ self._date_partition_column = date_partition_column
[docs] @staticmethod def from_proto(data_source): """ Convert data source config in FeatureTable spec to a DataSource class object. """ if data_source.file_options.file_format and data_source.file_options.file_url: data_source_obj = FileSource( field_mapping=data_source.field_mapping, file_format=FileFormat.from_proto(data_source.file_options.file_format), file_url=data_source.file_options.file_url, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif data_source.bigquery_options.table_ref: data_source_obj = BigQuerySource( field_mapping=data_source.field_mapping, table_ref=data_source.bigquery_options.table_ref, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( data_source.kafka_options.bootstrap_servers and data_source.kafka_options.topic and data_source.kafka_options.message_format ): data_source_obj = KafkaSource( field_mapping=data_source.field_mapping, bootstrap_servers=data_source.kafka_options.bootstrap_servers, message_format=StreamFormat.from_proto( data_source.kafka_options.message_format ), topic=data_source.kafka_options.topic, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( data_source.kinesis_options.record_format and data_source.kinesis_options.region and data_source.kinesis_options.stream_name ): data_source_obj = KinesisSource( field_mapping=data_source.field_mapping, record_format=StreamFormat.from_proto( data_source.kinesis_options.record_format ), region=data_source.kinesis_options.region, stream_name=data_source.kinesis_options.stream_name, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) else: raise ValueError("Could not identify the source type being added") return data_source_obj
[docs] def to_proto(self) -> DataSourceProto: """ Converts an DataSourceProto object to its protobuf representation. """ raise NotImplementedError
[docs]class FileSource(DataSource): def __init__( self, event_timestamp_column: str, file_format: FileFormat, file_url: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._file_options = FileOptions(file_format=file_format, file_url=file_url) def __eq__(self, other): if not isinstance(other, FileSource): raise TypeError("Comparisons should only involve FileSource class objects.") if ( self.file_options.file_url != other.file_options.file_url or self.file_options.file_format != other.file_options.file_format ): return False return True @property def file_options(self): """ Returns the file options of this data source """ return self._file_options @file_options.setter def file_options(self, file_options): """ Sets the file options of this data source """ self._file_options = file_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_FILE, field_mapping=self.field_mapping, file_options=self.file_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs]class BigQuerySource(DataSource): def __init__( self, event_timestamp_column: str, table_ref: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._bigquery_options = BigQueryOptions(table_ref=table_ref,) def __eq__(self, other): if not isinstance(other, BigQuerySource): raise TypeError( "Comparisons should only involve BigQuerySource class objects." ) if self.bigquery_options.table_ref != other.bigquery_options.table_ref: return False return True @property def table_ref(self): return self._bigquery_options.table_ref @property def bigquery_options(self): """ Returns the bigquery options of this data source """ return self._bigquery_options @bigquery_options.setter def bigquery_options(self, bigquery_options): """ Sets the bigquery options of this data source """ self._bigquery_options = bigquery_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_BIGQUERY, field_mapping=self.field_mapping, bigquery_options=self.bigquery_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs]class KafkaSource(DataSource): def __init__( self, event_timestamp_column: str, bootstrap_servers: str, message_format: StreamFormat, topic: str, created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, message_format=message_format, topic=topic, ) def __eq__(self, other): if not isinstance(other, KafkaSource): raise TypeError( "Comparisons should only involve KafkaSource class objects." ) if ( self.kafka_options.bootstrap_servers != other.kafka_options.bootstrap_servers or self.kafka_options.message_format != other.kafka_options.message_format or self.kafka_options.topic != other.kafka_options.topic ): return False return True @property def kafka_options(self): """ Returns the kafka options of this data source """ return self._kafka_options @kafka_options.setter def kafka_options(self, kafka_options): """ Sets the kafka options of this data source """ self._kafka_options = kafka_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KAFKA, field_mapping=self.field_mapping, kafka_options=self.kafka_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto
[docs]class KinesisSource(DataSource): def __init__( self, event_timestamp_column: str, created_timestamp_column: str, record_format: StreamFormat, region: str, stream_name: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): super().__init__( event_timestamp_column, created_timestamp_column, field_mapping, date_partition_column, ) self._kinesis_options = KinesisOptions( record_format=record_format, region=region, stream_name=stream_name ) def __eq__(self, other): if not isinstance(other, KinesisSource): raise TypeError( "Comparisons should only involve KinesisSource class objects." ) if ( self.kinesis_options.record_format != other.kinesis_options.record_format or self.kinesis_options.region != other.kinesis_options.region or self.kinesis_options.stream_name != other.kinesis_options.stream_name ): return False return True @property def kinesis_options(self): """ Returns the kinesis options of this data source """ return self._kinesis_options @kinesis_options.setter def kinesis_options(self, kinesis_options): """ Sets the kinesis options of this data source """ self._kinesis_options = kinesis_options
[docs] def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KINESIS, field_mapping=self.field_mapping, kinesis_options=self.kinesis_options.to_proto(), ) data_source_proto.event_timestamp_column = self.event_timestamp_column data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto