Distributed Systems¶

2021/22¶

Lab 10

Nuno Preguiça, Sérgio Duarte, Dina Borrego, João Vilalonga

Goals¶

In the end of this lab you should be able to:

  • Understand what is Apache Kafka;
  • Know how to perform publish/subscribe communications using Kafka;
  • Know how to exploit Kafka to perform total order execution of operations

Apache KAFKA¶

Kakfa is an open-source distributed event streaming platform.

  • Exposes a topic-based publish/subscribe API.
  • Provides group communication with ordering and reliability guarantees.

Apache KAFKA Architecture¶

Apache KAFKA - Reliability¶

Events are persisted in a persistent log, stored in stable secondary stable memory (disk).

  • The log survives broker crashes;
  • The log is pruned lazily (ex., events in log can last days);
  • The log can be replayed from the beginning
    to receive events published while subscriber was offline;
  • Each topic has its own, independent, event log.

Apache KAFKA - Ordering¶

For performance reasons, for each topic, the event log can be partitioned.

For each topic, events are totally ordered within a given partition.

The event record offset defines the ordering of an event within a given partition.

For topics that use more than one partition, events are partially ordered.

Apache KAFKA - Maven Dependencies¶

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
</dependencies>

Apache KAFKA - Create Publisher¶

static final String TOPIC = "mytopic";
static final String KAFKA_BROKERS = "localhost:9092";
static final String EVENT_DATA_STR = "some event data";

var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

var publisher = new KafkaProducer<String, String>(props);

KafkaProducer represents the event publisher.

To create the publisher we need some configuration properties.

  • A comma separated list of brokers endpoints (host:port);
  • The serializer classes used to encode event data into raw bytes.

There are more configuration properties that provide fine control over the behavior of the publisher

Apache KAFKA - Publishing Events¶

var publisher = new KafkaProducer<String, String>(props);       

var promise = publisher.send( new ProducerRecord<>(TOPIC, EVENT_DATA_STR));
var metadata = promise.get();

long offset = metadata.offset();

Events are published as producer records.

We need to provide:

  • The event topic;
  • The event value;
  • Optionally, the event key and/or partition.

The send operation is asynchronous and only returns a future that will hold the event record metadata after completion.

To block until the send is completed, we call get() on the metadata future.

The event record metadata contains some useful information about the event, including its offset.

Apache KAFKA - Create Subscriber¶

static final String MY_GROUP_ID = "my_group_id";
static final String REPLAY_FROM_BEGINNING = "earliest";
static final String KAFKA_BROKERS = "localhost:9092, kafka:9092";

var props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

props.put(ConsumerConfig.GROUP_ID_CONFIG, MY_GROUP_ID );
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, REPLAY_FROM_BEGINNING);

var consumer = new KafkaConsumer<String, String>(props);

KafkaConsumer represents the event subscriber.

To create the subscriber we need some configuration properties.

  • A comma separated list of brokers endpoints (host:port);
  • The deserializer classes used to decode raw bytes to event key and values;
  • The group id of the consumer (if multiple consumers share the same group id, consumed events are split among them)
  • The mode of event log replay: earliest - from the beggining; latest - the next event to be published.

There are more configuration properties that provide fine control over the behavior of the subscriber.

Apache KAFKA - Subscription and notification¶

static final String TOPIC = "mytopic";

var consumer = new KafkaConsumer<String, String>(props);

consumer.subscribe(List.of(TOPIC));

consumer.poll(Duration.ofSeconds(10)).forEach( System.out::println );

The consumer can subscribe to multiple topics, supplied as a list.

Events are notified as consumer records.

The event record contains the event data: key and value, as well as, metadata,
namely, the topic it belongs to, and the offset, within the topic partition.

Consuming events is a blocking operation: poll blocks for a period of time;
and, returns the list of events, ordered according to the semantics:

  • unordered among topics;
  • partially ordered among partitions of the same topic;
  • totally ordered within the same topic partition.

Apache Zookeeper¶

Zookeeper is a centralized service for highly reliable distributed coordination.

Can provide distributed applications with:

  • Naming;
  • Distributed synchronization;
  • Distributed consensus/elections;
  • Configuration information storage.

Apache Zookeeper¶

Zookeeper provides hierarchical tuple space that stores information in a reliable and strongly consistent way.

Some relevant aspects of zookeeper:

  • Tuples have a unique name and optionally a value associated.
    • A tuple is named a znode in the zookeeper nomenclature.
  • Tuples are hierarchical like the directory structure of a file system.
    • You can have a tuple named ’zoo’; a child tuple named ‘one’, whose complete name is ‘/zoo/one’
  • It is possible to monitor changes to a znode (and all its children),
    and be notified by zookeeper whenever a change happens.

Zookeeper tuples types¶

There are several kinds of znodes.

  • Persistent znodes survive the crash of the client that created them;
  • Ephemeral nodes are deleted automatically when the client session/connection that created them is broken;

  • Persistent_Sequential and Ephemeral_Sequential are nodes whose names are generated automatically in a strictly increasing order.

Zookeeper - Maven dependencies¶

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

Zookeeper - Client session¶¶

static final int TIMEOUT = 5000;

var connectedSignal = new CountDownLatch(1);

client = new ZooKeeper(host, TIMEOUT, (e) -> {
    if (e.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
        connectedSignal.countDown();
});

connectedSignal.await();
// client is now connected...

Connection to the Zookeeper server is asynchronous and go throw multiple states.

A countdown latch can be used as barrier to only proceed once the connection event is notified.

Zookeeper - Tuple creation¶

public String createNode(String path, byte[] data, CreateMode mode) {
    try {
        return client.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
    } catch (KeeperException.NodeExistsException x) {
        return path;
    } catch (Exception x) {
        return null;
    }
}

Example usages:

createNode("/path", new byte[0], CreateMode.PERSISTENT)

var new_path = createNode("/path", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL)

To create a tuple using a client in a connected state, we supply:

  • the path (full name) of the tuple;
  • the data associated with the tuple;
  • the type of node.

For EPHEMERAL_SEQUENTIAL zknodes the name is created automatically by Zookeeper and is returned by the creation operation.

Zookeeper List Tuple Children¶

public List<String> getChildren(String path) {
    try {
        return client.getChildren(path, false);
    } catch (Exception x) {
        x.printStackTrace();
    }
    return Collections.emptyList();
}

Zookeeper List Tuple Children¶

public List<String> getChildren(String path, Watcher watcher) {
    try {
        return client().getChildren(path, watcher);
    } catch (Exception x) {
        x.printStackTrace();
    }
    return Collections.emptyList();
}

Example usage:

getChilden("/path", (e) -> {
    // something under '/path' changed...
});

EXERCISES¶

Kafka¶

Kafka : clients in localhost¶

  • Download and study the sample project provided.

  • Start Kafka with the provided script.

    sh start-kafka.sh localhost

  • Run the KafkaSender and KafkaReceiver examples, in eclipse/IDE.

Kafka : clients as docker containers¶

  • Edit KafkaSender and KafkaReceiver and replace the KAFKA_BROKERS constant to "kafka:9092"

  • Start Kafka with the provided script.

    sh start-kafka.sh kafka

  • Build the docker image

    mvn clean compile assembly:single docker:build

  • Run KafkaReceiver

    docker run -t --network=sdnet sd2122-aula10-kafka java -cp /home/sd/sd.jar sd2122.aula10.kafka.examples.KafkaReceiver

  • Run KafkaSender

    docker run -t --network=sdnet sd2122-aula10-kafka java -cp /home/sd/sd.jar sd2122.aula10.kafka.examples.KafkaSender msg

Kafka: TP2 State Machine Replication Helper Code¶

  • The provided SyncPoint class is a generic version of the code presented in lectures.

  • Check TotalOrderExecutor class on how to use SyncPoint class.

Zookeeper¶

  • Download and study the sample project provided.

  • Start Zookeeper with the provided script. Note that zookeeper is packaged with kafka.

    sh start-kafka.sh kafka

  • Execute the sample Zookeeper client code.

    docker run -t --network=sdnet sd2122-aula10-zookeeper

Zookeeper: TP2 Primary/backup Replication¶

  • Check lectures materials on how to do primary election using Zookeeper.