GSP
Quick Navigator

Search Site

Unix VPS
A - Starter
B - Basic
C - Preferred
D - Commercial
MPS - Dedicated
Previous VPSs
* Sign Up! *

Support
Contact Us
Online Help
Handbooks
Domain Status
Man Pages

FAQ
Virtual Servers
Pricing
Billing
Technical

Network
Facilities
Connectivity
Topology Map

Miscellaneous
Server Agreement
Year 2038
Credits
 

USA Flag

 

 

Man Pages
Kafka::Producer(3) User Contributed Perl Documentation Kafka::Producer(3)

Kafka::Producer - Perl interface for Kafka producer client.

This documentation refers to "Kafka::Producer" version 0.8010 .

    use 5.010;
    use strict;
    use warnings;

    use Scalar::Util qw(
        blessed
    );
    use Try::Tiny;

    use Kafka::Connection;
    use Kafka::Producer;

    my ( $connection, $producer );
    try {

        #-- Connection
        $connection = Kafka::Connection->new( host => 'localhost' );

        #-- Producer
        $producer = Kafka::Producer->new( Connection => $connection );

        # Sending a single message
        my $response = $producer->send(
            'mytopic',          # topic
            0,                  # partition
            'Single message'    # message
        );

        # Sending a series of messages
        $response = $producer->send(
            'mytopic',          # topic
            0,                  # partition
            [                   # messages
                'The first message',
                'The second message',
                'The third message',
            ]
        );

    } catch {
        if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) {
            warn 'Error: (', $_->code, ') ',  $_->message, "\n";
            exit;
        } else {
            die $_;
        }
    };

    # Closes the producer and cleans up
    undef $producer;
    undef $connection;

Kafka producer API is implemented by "Kafka::Producer" class.

The main features of the "Kafka::Producer" class are:

  • Provides object-oriented API for producing messages.
  • Provides Kafka PRODUCE requests.

"new"

Creates new producer client object.

"new()" takes arguments in key-value pairs. The following arguments are currently recognized:

"Connection => $connection"
$connection is the Kafka::Connection object responsible for communication with the Apache Kafka cluster.
"CorrelationId => $correlation_id"
Optional, int32 signed integer, default = "undef" .

"Correlation" is a user-supplied integer. It will be passed back in the response by the server, unmodified. The $correlation_id should be an integer number.

If "CorrelationId" is not passed to constructor, its value will be assigned automatically (random negative integer).

An exception is thrown if "CorrelationId" sent with request does not match "CorrelationId" received in response.

"ClientId => $client_id"
This is a user supplied identifier (string) for the client application.

If "ClientId" is not passed to constructor, its value will be automatically assigned (to string 'producer').

"RequiredAcks => $acks"
The $acks should be an int16 signed integer.

Indicates how many acknowledgements the servers should receive before responding to the request.

If it is $NOT_SEND_ANY_RESPONSE the server does not send any response.

If it is $WAIT_WRITTEN_TO_LOCAL_LOG, the server will wait until the data is written to the local log before sending a response.

If it is $BLOCK_UNTIL_IS_COMMITTED the server will block until the message is committed by all in sync replicas before sending a response.

For any number > 1 the server will block waiting for this number of acknowledgements to occur.

$NOT_SEND_ANY_RESPONSE, $WAIT_WRITTEN_TO_LOCAL_LOG, $BLOCK_UNTIL_IS_COMMITTED can be imported from the Kafka module.

"Timeout => $timeout"
This provides a maximum time the server can await the receipt of the number of acknowledgements in "RequiredAcks".

The $timeout in seconds, could be any integer or floating-point type not bigger than int32 positive integer.

Optional, default = $REQUEST_TIMEOUT.

$REQUEST_TIMEOUT is the default timeout that can be imported from the Kafka module.

The following methods are defined for the "Kafka::Producer" class:

"send( $topic, $partition, $messages, $key, $compression_codec )"

Sends a messages on a Kafka::Connection object.

Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.

"send()" takes the following arguments:

$topic
The $topic must be a normal non-false string of non-zero length.
$partition
The $partition must be a non-negative integer.
$messages
The $messages is an arbitrary amount of data (a simple data string or a reference to an array of the data strings).
$key
The $key is an optional message key, must be a string. $key may used in the producer for partitioning with each message, so the consumer knows the partitioning key.
$compression_codec
Optional.

$compression_codec sets the required type of $messages compression, if the compression is desirable.

Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY. (the defaults that can be imported from the Kafka module.

When error is detected, an exception, represented by object of "Kafka::Exception::Producer" class, is thrown (see Kafka::Exceptions).

code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.

Authors suggest using of Try::Tiny's "try" and "catch" to handle exceptions while working with Kafka package.

"Invalid argument"
Invalid arguments were provided to a "new" constructor or to other method.
"Can't send"
Request cannot be sent.
"Can't recv"
Response cannot be received.
"Can't bind"
TCP connection can't be established on a given host and port.
"Can't get metadata"
IO error is present, errors found in the structure of the reply or the reply contains a non-zero error codes.
"Description leader not found"
Information about the server-leader is missing in metadata.
"Mismatch CorrelationId"
"CorrelationId" of response doesn't match one in request.
"There are no known brokers"
Information about brokers in the cluster is missing.
"Can't get metadata"
Obtained metadata is incorrect or failed to obtain metadata.

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at <http://kafka.apache.org/>

Kafka Protocol at <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>

Kafka package is hosted on GitHub: <https://github.com/TrackingSoft/Kafka>

Sergey Gladkov, <sgladkov@trackingsoft.com>

Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Copyright (C) 2012-2013 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at <http://dev.perl.org/licenses/artistic.html>.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

2015-02-06 perl v5.32.1

Search for    or go to Top of page |  Section 3 |  Main Index

Powered by GSP Visit the GSP FreeBSD Man Page Interface.
Output converted with ManDoc.