GSP
Quick Navigator

Search Site

Unix VPS
A - Starter
B - Basic
C - Preferred
D - Commercial
MPS - Dedicated
Previous VPSs
* Sign Up! *

Support
Contact Us
Online Help
Handbooks
Domain Status
Man Pages

FAQ
Virtual Servers
Pricing
Billing
Technical

Network
Facilities
Connectivity
Topology Map

Miscellaneous
Server Agreement
Year 2038
Credits
 

USA Flag

 

 

Man Pages
Kafka::Consumer(3) User Contributed Perl Documentation Kafka::Consumer(3)

Kafka::Consumer - Perl interface for Kafka consumer client.

This documentation refers to "Kafka::Consumer" version 0.8010 .

    use 5.010;
    use strict;
    use warnings;

    use Scalar::Util qw(
        blessed
    );
    use Try::Tiny;

    use Kafka qw(
        $DEFAULT_MAX_BYTES
        $DEFAULT_MAX_NUMBER_OF_OFFSETS
        $RECEIVE_EARLIEST_OFFSETS
    );
    use Kafka::Connection;
    use Kafka::Consumer;

    my ( $connection, $consumer );
    try {

        #-- Connection
        $connection = Kafka::Connection->new( host => 'localhost' );

        #-- Consumer
        $consumer = Kafka::Consumer->new( Connection  => $connection );

        # Get a list of valid offsets up max_number before the given time
        my $offsets = $consumer->offsets(
            'mytopic',                      # topic
            0,                              # partition
            $RECEIVE_EARLIEST_OFFSETS,      # time
            $DEFAULT_MAX_NUMBER_OF_OFFSETS  # max_number
        );

        if( @$offsets ) {
            say "Received offset: $_" foreach @$offsets;
        } else {
            warn "Error: Offsets are not received\n";
        }

        # Consuming messages
        my $messages = $consumer->fetch(
            'mytopic',                      # topic
            0,                              # partition
            0,                              # offset
            $DEFAULT_MAX_BYTES              # Maximum size of MESSAGE(s) to receive
        );

        if ( $messages ) {
            foreach my $message ( @$messages ) {
                if( $message->valid ) {
                    say 'payload    : ', $message->payload;
                    say 'key        : ', $message->key;
                    say 'offset     : ', $message->offset;
                    say 'next_offset: ', $message->next_offset;
                } else {
                    say 'error      : ', $message->error;
                }
            }
        }

    } catch {
        if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) {
            warn 'Error: (', $_->code, ') ',  $_->message, "\n";
            exit;
        } else {
            die $_;
        }
    };

    # Closes the consumer and cleans up
    undef $consumer;
    undef $connection;

Kafka consumer API is implemented by "Kafka::Consumer" class.

The main features of the "Kafka::Consumer" class are:

  • Provides an object-oriented API for consuming messages.
  • Provides Kafka FETCH and OFFSETS requests.
  • Supports parsing the Apache Kafka 0.8 Wire Format protocol.
  • Works with 64-bit elements of the Kafka Wire Format protocol on 32 bit systems.

The Kafka consumer response returns ARRAY references for "offsets" and "fetch" methods.

Array returned by "offsets" contains offset integers.

Array returned by "fetch" contains objects of Kafka::Message class.

"new"

Creates a new consumer client object. Returns the created "Kafka::Consumer" object.

"new()" takes arguments in key-value pairs. The following arguments are recognized:

"Connection => $connection"
$connection is the Kafka::Connection object responsible for communication with the Apache Kafka cluster.
"CorrelationId => $correlation_id"
Optional, default is "undef".

"Correlation" is a user-supplied integer. It will be passed back in the response by the server, unmodified. The $correlation_id should be an integer number.

"CorrelationId" will be auto-assigned (random negative number) if it was not provided on creation of Kafka::Producer object.

An exception is thrown if "CorrelationId" sent with request does not match "CorrelationId" received in response.

"ClientId => $client_id"
This is a user supplied identifier (string) for the client application.

"ClientId" will be auto-assigned if not passed in when creating Kafka::Producer object.

"MaxWaitTime => $max_time"
The maximum amount of time (ms) to wait when no sufficient data is available at the time the request was issued.

Optional, default is $DEFAULT_MAX_WAIT_TIME.

$DEFAULT_MAX_WAIT_TIME is the default time that can be imported from the Kafka module.

The $max_time must be a positive integer.

"MinBytes => $min_bytes"
The minimum number of bytes of messages that must be available to give a response. If the client sets this to $MIN_BYTES_RESPOND_IMMEDIATELY the server will always respond immediately. If it is set to $MIN_BYTES_RESPOND_HAS_DATA, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. Setting higher values in combination with the bigger timeouts allows reading larger chunks of data.

Optional, int32 signed integer, default is $MIN_BYTES_RESPOND_IMMEDIATELY.

$MIN_BYTES_RESPOND_IMMEDIATELY, $MIN_BYTES_RESPOND_HAS_DATA are the defaults that can be imported from the Kafka module.

The $min_bytes must be a non-negative int32 signed integer.

"MaxBytes => $max_bytes"
The maximum bytes to include in the message set for this partition.

Optional, int32 signed integer, default = $DEFAULT_MAX_BYTES (1_000_000).

The $max_bytes must be more than $MESSAGE_SIZE_OVERHEAD (size of protocol overhead - data added by Kafka wire protocol to each message).

$DEFAULT_MAX_BYTES, $MESSAGE_SIZE_OVERHEAD are the defaults that can be imported from the Kafka module.

"MaxNumberOfOffsets => $max_number"
Limit the number of offsets returned by Kafka.

That is a non-negative integer.

Optional, int32 signed integer, default = $DEFAULT_MAX_NUMBER_OF_OFFSETS (100).

$DEFAULT_MAX_NUMBER_OF_OFFSETS is the default that can be imported from the Kafka module.

The following methods are defined for the "Kafka::Consumer" class:

"fetch( $topic, $partition, $start_offset, $max_size )"

Get a list of messages to consume one by one up to $max_size bytes.

Returns the reference to array of the Kafka::Message objects.

"fetch()" takes the following arguments:

$topic
The $topic must be a normal non-false string of non-zero length.
$partition
The $partition must be a non-negative integer.
$start_offset
Offset in topic and partition to start from (64-bit integer).

The argument must be a non-negative integer. The argument may be a Math::BigInt integer on 32-bit system.

$max_size
$max_size is the maximum size of the messages set to return. The argument must be a positive int32 signed integer.

The maximum size of a request limited by "MAX_SOCKET_REQUEST_BYTES" that can be imported from Kafka module.

"offsets( $topic, $partition, $time, $max_number )"

Get a list of valid offsets up to $max_number before the given time.

Returns reference to array of the offset integers (Math::BigInt integers on 32 bit system).

"offsets()" takes the following arguments:

$topic
The $topic must be a normal non-false string of non-zero length.
$partition
The $partition must be a non-negative integer.
$time
Get offsets before the given time (in milliseconds since UNIX Epoch).

The argument must be a positive number.

The argument may be a Math::BigInt integer on 32 bit system.

The special values $RECEIVE_LATEST_OFFSET (-1), $RECEIVE_EARLIEST_OFFSETS (-2) are allowed.

$RECEIVE_LATEST_OFFSET, $RECEIVE_EARLIEST_OFFSETS are the defaults that can be imported from the Kafka module.

$max_number
$max_number is the maximum number of offsets to retrieve.

Optional. The argument must be a positive int32 signed integer.

When error is detected, an exception, represented by object of "Kafka::Exception::Consumer" class, is thrown (see Kafka::Exceptions).

code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.

Authors suggest using of Try::Tiny's "try" and "catch" to handle exceptions while working with Kafka package.

"Invalid argument"
Invalid argument passed to a "new" constructor or other method.
"Can't send"
Request cannot be sent.
"Can't recv"
Response cannot be received.
"Can't bind"
TCP connection can't be established on the given host and port.
"Can't get metadata"
Failed to obtain metadata from Kafka servers.
"Leader not found"
Missing information about server-leader in metadata.
"Mismatch CorrelationId"
"CorrelationId" of response doesn't match one in request.
"There are no known brokers"
Resulting metadata has no information about cluster brokers.
"Can't get metadata"
Received metadata has incorrect internal structure.

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at <http://kafka.apache.org/>

Kafka Protocol at <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>

Kafka package is hosted on GitHub: <https://github.com/TrackingSoft/Kafka>

Sergey Gladkov, <sgladkov@trackingsoft.com>

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Copyright (C) 2012-2013 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at <http://dev.perl.org/licenses/artistic.html>.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

2015-02-06 perl v5.32.1

Search for    or go to Top of page |  Section 3 |  Main Index

Powered by GSP Visit the GSP FreeBSD Man Page Interface.
Output converted with ManDoc.