Source code for feast.source

# Copyright 2019 The Feast Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from feast.core.Source_pb2 import KafkaSourceConfig as KafkaSourceConfigProto
from feast.core.Source_pb2 import Source as SourceProto
from feast.core.Source_pb2 import SourceType as SourceTypeProto

[docs]class Source: """ Source is the top level class that represents a data source for finding feature data. Source must be extended with specific implementations to be useful """ def __eq__(self, other): return True @property def source_type(self) -> str: """ The type of source. If not implemented, this will return "None" """ return "None"
[docs] def to_proto(self): """ Converts this source object to its protobuf representation. """ return None
[docs] @classmethod def from_proto(cls, source_proto: SourceProto): """ Creates a source from a protobuf representation. This will instantiate and return a specific source type, depending on the protobuf that is passed in. Args: source_proto: SourceProto python object Returns: Source object """ if source_proto.type == SourceTypeProto.KAFKA: return KafkaSource( brokers=source_proto.kafka_source_config.bootstrap_servers, topic=source_proto.kafka_source_config.topic, ) return cls()
[docs]class KafkaSource(Source): """ Kafka feature set source type. """ def __init__(self, brokers: str = "", topic: str = ""): """ Args: brokers: Comma separated list of Kafka brokers/bootstrap server addresses, for example: my-host:9092,other-host:9092 topic: Kafka topic to find feature rows for this feature set """ self._source_type = "Kafka" self._brokers = brokers self._topic = topic def __eq__(self, other): if ( self.brokers != other.brokers or self.topic != other.topic or self.source_type != other.source_type ): return False return True @property def brokers(self) -> str: """ Returns the list of broker addresses for this Kafka source """ return self._brokers @property def topic(self) -> str: """ Returns the topic for this feature set """ return self._topic @property def source_type(self) -> str: """ Returns the type of source. For a Kafka source this will always return "kafka" """ return self._source_type
[docs] def to_proto(self) -> SourceProto: """ Converts this Source into its protobuf representation """ return SourceProto( type=SourceTypeProto.KAFKA, kafka_source_config=KafkaSourceConfigProto( bootstrap_servers=self.brokers, topic=self.topic ), )