# Copyright 2019 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
from tqdm import tqdm
[docs]class AbstractProducer:
"""
Abstract class for Kafka producers
"""
def __init__(self, brokers: str, row_count: int, disable_progress_bar: bool):
self.brokers = brokers
self.row_count = row_count
# Progress bar will always display average rate
self.pbar = tqdm(
total=row_count, unit="rows", smoothing=0, disable=disable_progress_bar
)
[docs] def produce(self, topic: str, data: bytes):
message = "{} should implement a produce method".format(self.__class__.__name__)
raise NotImplementedError(message)
[docs] def flush(self, timeout: int):
message = "{} should implement a flush method".format(self.__class__.__name__)
raise NotImplementedError(message)
def _inc_pbar(self, meta):
self.pbar.update(1)
def _set_error(self, exception: str):
raise Exception(exception)
[docs] def print_results(self) -> None:
"""
Print ingestion statistics.
Returns:
None: None
"""
# Refresh and close tqdm progress bar
self.pbar.refresh()
self.pbar.close()
print("Ingestion complete!")
print(f"\nIngestion statistics:" f"\nSuccess: {self.pbar.n}/{self.row_count}")
return None
[docs]class ConfluentProducer(AbstractProducer):
"""
Concrete implementation of Confluent Kafka producer (confluent-kafka)
"""
def __init__(self, brokers: str, row_count: int, disable_progress_bar: bool):
from confluent_kafka import Producer
self.producer = Producer({"bootstrap.servers": brokers})
super().__init__(brokers, row_count, disable_progress_bar)
[docs] def produce(self, topic: str, value: bytes) -> None:
"""
Generic produce that implements confluent-kafka's produce method to
push a byte encoded object into a Kafka topic.
Args:
topic (str): Kafka topic.
value (bytes): Byte encoded object.
Returns:
None: None.
"""
try:
self.producer.produce(topic, value=value, callback=self._delivery_callback)
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
self.producer.poll(0)
except Exception as ex:
self._set_error(str(ex))
return None
[docs] def flush(self, timeout: Optional[int]):
"""
Generic flush that implements confluent-kafka's flush method.
Args:
timeout (Optional[int]): Timeout in seconds to wait for completion.
Returns:
int: Number of messages still in queue.
"""
messages = self.producer.flush(timeout=timeout)
if messages:
raise Exception("Not all Kafka messages are successfully delivered.")
return messages
def _delivery_callback(self, err: str, msg) -> None:
"""
Optional per-message delivery callback (triggered by poll() or flush())
when a message has been successfully delivered or permanently failed
delivery (after retries).
Although the msg argument is not used, the current method signature is
required as specified in the confluent-kafka documentation.
Args:
err (str): Error message.
msg (): Kafka message.
Returns:
None
"""
if err:
self._set_error(err)
else:
self._inc_pbar(None)
[docs]class KafkaPythonProducer(AbstractProducer):
"""
Concrete implementation of Python Kafka producer (kafka-python)
"""
def __init__(self, brokers: str, row_count: int, disable_progress_bar: bool):
from kafka import KafkaProducer
self.producer = KafkaProducer(bootstrap_servers=[brokers])
super().__init__(brokers, row_count, disable_progress_bar)
[docs] def produce(self, topic: str, value: bytes):
"""
Generic produce that implements kafka-python's send method to push a
byte encoded object into a Kafka topic.
Args:
topic (str): Kafka topic.
value (bytes): Byte encoded object.
Returns:
FutureRecordMetadata: resolves to RecordMetadata
Raises:
KafkaTimeoutError: if unable to fetch topic metadata, or unable
to obtain memory buffer prior to configured max_block_ms
"""
return (
self.producer.send(topic, value=value)
.add_callback(self._inc_pbar)
.add_errback(self._set_error)
)
[docs] def flush(self, timeout: Optional[int]):
"""
Generic flush that implements kafka-python's flush method.
Args:
timeout (Optional[int]): timeout in seconds to wait for completion.
Returns:
None
Raises:
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
"""
messages = self.producer.flush(timeout=timeout)
if messages:
raise Exception("Not all Kafka messages are successfully delivered.")
return messages
[docs]def get_producer(
brokers: str, row_count: int, disable_progress_bar: bool
) -> Union[ConfluentProducer, KafkaPythonProducer]:
"""
Simple context helper function that returns a AbstractProducer object when
invoked.
This helper function will try to import confluent-kafka as a producer first.
This helper function will fallback to kafka-python if it fails to import
confluent-kafka.
Args:
brokers (str): Kafka broker information with hostname and port.
row_count (int): Number of rows in table
Returns:
Union[ConfluentProducer, KafkaPythonProducer]:
Concrete implementation of a Kafka producer. Ig can be:
* confluent-kafka producer
* kafka-python producer
"""
try:
return ConfluentProducer(brokers, row_count, disable_progress_bar)
except ImportError:
print("Unable to import confluent-kafka, falling back to kafka-python")
return KafkaPythonProducer(brokers, row_count, disable_progress_bar)