|
NAMEKafka::Consumer - Perl interface for Kafka consumer client. VERSIONThis documentation refers to "Kafka::Consumer" version 0.8010 . SYNOPSIS use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
use Kafka qw(
$DEFAULT_MAX_BYTES
$DEFAULT_MAX_NUMBER_OF_OFFSETS
$RECEIVE_EARLIEST_OFFSETS
);
use Kafka::Connection;
use Kafka::Consumer;
my ( $connection, $consumer );
try {
#-- Connection
$connection = Kafka::Connection->new( host => 'localhost' );
#-- Consumer
$consumer = Kafka::Consumer->new( Connection => $connection );
# Get a list of valid offsets up max_number before the given time
my $offsets = $consumer->offsets(
'mytopic', # topic
0, # partition
$RECEIVE_EARLIEST_OFFSETS, # time
$DEFAULT_MAX_NUMBER_OF_OFFSETS # max_number
);
if( @$offsets ) {
say "Received offset: $_" foreach @$offsets;
} else {
warn "Error: Offsets are not received\n";
}
# Consuming messages
my $messages = $consumer->fetch(
'mytopic', # topic
0, # partition
0, # offset
$DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
);
if ( $messages ) {
foreach my $message ( @$messages ) {
if( $message->valid ) {
say 'payload : ', $message->payload;
say 'key : ', $message->key;
say 'offset : ', $message->offset;
say 'next_offset: ', $message->next_offset;
} else {
say 'error : ', $message->error;
}
}
}
} catch {
if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) {
warn 'Error: (', $_->code, ') ', $_->message, "\n";
exit;
} else {
die $_;
}
};
# Closes the consumer and cleans up
undef $consumer;
undef $connection;
DESCRIPTIONKafka consumer API is implemented by "Kafka::Consumer" class. The main features of the "Kafka::Consumer" class are:
The Kafka consumer response returns ARRAY references for "offsets" and "fetch" methods. Array returned by "offsets" contains offset integers. Array returned by "fetch" contains objects of Kafka::Message class. CONSTRUCTOR"new" Creates a new consumer client object. Returns the created "Kafka::Consumer" object. new() takes arguments in key-value pairs. The following arguments are recognized:
METHODSThe following methods are defined for the "Kafka::Consumer" class: "fetch( $topic, $partition, $start_offset, $max_size )" Get a list of messages to consume one by one up to $max_size bytes. Returns the reference to array of the Kafka::Message objects. fetch() takes the following arguments:
"offsets( $topic, $partition, $time, $max_number )" Get a list of valid offsets up to $max_number before the given time. Returns reference to array of the offset integers (Math::BigInt integers on 32 bit system). offsets() takes the following arguments:
DIAGNOSTICSWhen error is detected, an exception, represented by object of "Kafka::Exception::Consumer" class, is thrown (see Kafka::Exceptions). code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods. Authors suggest using of Try::Tiny's "try" and "catch" to handle exceptions while working with Kafka package.
SEE ALSOThe basic operation of the Kafka package modules: Kafka - constants and messages used by the Kafka package modules. Kafka::Connection - interface to connect to a Kafka cluster. Kafka::Producer - interface for producing client. Kafka::Consumer - interface for consuming client. Kafka::Message - interface to access Kafka message properties. Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems. Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol. Kafka::IO - low-level interface for communication with Kafka server. Kafka::Exceptions - module designated to handle Kafka exceptions. Kafka::Internals - internal constants and functions used by several package modules. A wealth of detail about the Apache Kafka and the Kafka Protocol: Main page at <http://kafka.apache.org/> Kafka Protocol at <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol> SOURCE CODEKafka package is hosted on GitHub: <https://github.com/TrackingSoft/Kafka> AUTHORSergey Gladkov, <sgladkov@trackingsoft.com> CONTRIBUTORSAlexander Solovey Jeremy Jordan Sergiy Zuban Vlad Marchenko COPYRIGHT AND LICENSECopyright (C) 2012-2013 by TrackingSoft LLC. This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at <http://dev.perl.org/licenses/artistic.html>. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|