io7m | archive (zip, signature)

1 Aeron For The Working Programmer

1.1 Overview

Aeron is an ultra-efficient message transport library for Java and C++. It is designed to work over unreliable media protocols such as UDP and Infiniband, and offers ordered messaging and optional reliability (by retransmission of messages in the case of dropped packets). The design and implementation has an extreme emphasis on low-latency communication, making the library ideal for use in applications with realtime requirements such as fast-paced networked multiplayer games, high frequency financial trading, VOIP, video streaming, etc. In particular, the Java implementation is designed such that it will produce no garbage during steady state execution, reducing memory pressure and work for the collector.

This guide is an attempt to describe how to put together a working server that can serve a number of clients concurrently. It is somewhat biased towards the perspective of a developer using Aeron as the networking component of a client/server multiplayer game engine. Specifically, the intention is that the described server configuration will serve a relatively small number of clients (typically less than a hundred) concurrently as opposed to serving the tens of thousands of clients concurrently that might be expected of a high-performance web server 1.

This guide is intended to supplement the existing Java programming guide. Familiarity with networking and UDP sockets is assumed, but no prior experience with Aeron is assumed. See the following documents for details:

2 Concepts

2.1 Publications And Subscriptions

The Aeron library works with unidirectional streams of messages known as publications and subscriptions. Intuitively, a publication is a stream of messages to which you can write, and a subscription is a stream of messages from which you can read. This requires a slight adjustment in the way one would usually think about programming within the context of, say, UDP sockets. UDP sockets are bidirectional; the programmer will usually create a UDP socket, bind it to a local address, and then read from that socket to receive datagrams, and write datagrams to that same socket to send messages to peers. Aeron, in contrast, requires the programmer to create separate pairs of publications and subscriptions to write and read messages to and from peers.

2.2 Media Driver

Aeron defines a protocol on top of which the user is expected to implement their own application-specific message protocol. The Aeron protocol handles the details of message transmission, retransmission, ordering, fragmentation, etc, leaving the application free to send whatever messages it needs without having to worry about ensuring those messages actually arrive and arrive in the right order.

The actual low-level handling of the transmission medium (such as UDP sockets) is handled by a module of code known as the media driver. The media driver can be either run standalone as a separate process, or can be embedded within the application. For the examples presented in this guide, we assume that the media driver will be embedded within the application and, therefore, the application is responsible for configuring, starting up, and shutting down the media driver correctly.

3 A Client And Server (Take 1)

As a first step, we’ll write a trivial echo server and client: Clients can connect to the server and send UTF-8 strings, and the server will send them back. More formally, the client will:

  1. Send an initial string HELLO <port>, where <port> is an unsigned decimal integer indicating the port to which responses from the server should be sent.

  2. If the initial HELLO string has been sent, the client will send an infinite series of arbitrary unsigned 32-bit decimal integers encoded as UTF-8 strings, one string per second.

The server will:

  1. Wait for a client to connect and then read the initial HELLO <port> message.

  2. For each connected client c, the server will read a string s from c and then send back the exact same string s to the source address of c and the port p that c specified in its initial HELLO string.

We’ll step through the simplest possible implementation that can work, and then critically assess it with an eye to producing a second, better implementation. No attempt will be made to produce efficient code: We are, after all, sending UTF-8 strings as messages; There will be allocations! For this example code, if any choice must be made between efficiency, correctness, or simplicity, simplicity will be chosen every time.

In order to write a server, the following steps are required:

  1. Start up the media driver.
  2. Create a subscription that will be used to read messages from clients.
  3. Go into a loop, creating new publications/subscriptions for clients as they connect, and reading/writing messages from/to existing clients.

In order to write a client, the following steps are required:

  1. Start up the media driver.
  2. Create a publication that will be used to send messages to the server.
  3. Go into a loop, sending messages to the server and reading back the responses.

For the sake of simplicity, we’ll write the client first. All of the code presented here is available as a Maven project on GitHub, but excerpts will be printed here for explanatory purposes.

3.1 Echo Client

We start by defining an EchoClient class with a static create method that initializes a media driver and an instance of the Aeron library to go with it. The media driver requires a directory on the filesystem within which it creates various temporary files that are memory-mapped to allow efficient thread-safe communication between the separate components of the library. For best results, this directory should reside on a memory-backed filesystem (such as /dev/shm on Linux), but this is not actually required.

The create method we define simply creates and launches Aeron and the media driver. It also takes the local address that the client will use for communication, and the address of the server to which the client will connect. It passes these addresses on to the constructor for later use.

EchoClient.create()

  public static EchoClient create(
    final Path media_directory,
    final InetSocketAddress local_address,
    final InetSocketAddress remote_address)
    throws Exception
  {
    Objects.requireNonNull(media_directory, "media_directory");
    Objects.requireNonNull(local_address, "local_address");
    Objects.requireNonNull(remote_address, "remote_address");

    final String directory =
      media_directory.toAbsolutePath().toString();

    final MediaDriver.Context media_context =
      new MediaDriver.Context()
        .dirDeleteOnStart(true)
        .aeronDirectoryName(directory);

    final Aeron.Context aeron_context =
      new Aeron.Context().aeronDirectoryName(directory);

    MediaDriver media_driver = null;

    try {
      media_driver = MediaDriver.launch(media_context);

      Aeron aeron = null;
      try {
        aeron = Aeron.connect(aeron_context);
      } catch (final Exception e) {
        closeIfNotNull(aeron);
        throw e;
      }

      return new EchoClient(media_driver, aeron, local_address, remote_address);
    } catch (final Exception e) {
      closeIfNotNull(media_driver);
      throw e;
    }
  }

The various Context types contain a wealth of configuration options. For the sake of this example, we’re only interested in setting the directory locations and will otherwise use the default settings.

Because the EchoClient class is the one responsible for starting up the media driver, it’s also responsible for shutting down the media driver and Aeron when necessary. It implements the standard java.io.Closeable interface and does the necessary cleanup in the close method.

EchoClient.close()

  public void close()
  {
    this.aeron.close();
    this.media_driver.close();
  }

Now we need to define a simple (blocking) run method that attempts to connect to the server and then goes into an infinite loop sending and receiving messages. For the sake of producing readable output, we’ll limit to polling for messages once per second. In real applications with low-latency requirements, this would obviously be completely counterproductive and a more sensible delay would be used 2. The method is constructed from several parts, so we’ll define each of those as their own methods and then compose them at the end.

Firstly, we need to create a subscription that will be used to receive messages from the remote server. This is somewhat analogous, for those familiar with UDP programming, to opening a UDP socket and binding it to a local address and port. We create the subscription by constructing a channel URI based on the local address given in the create method. We use the convenient ChannelUriStringBuilder class to create a URI string specifying the details of the subscription. We state that we want to use udp as the underlying transport, and that we want the channel to be reliable (meaning that lost messages will be retransmitted, and messages will be delivered to the remote side in the order that they were sent). We also specify a stream ID when creating the subscription. Aeron is capable of multiplexing several independent streams of messages into a single connection. It’s therefore necessary for the client and server to agree on a stream ID that will be used for communcation. In this case we simply pick an arbitrary value of 0x2044f002, but any non-zero 32-bit unsigned value can be used; the choice is entirely up to the application.

EchoClient.setupSubscription()

  private Subscription setupSubscription()
  {
    final String sub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(this.local_address.toString().replaceFirst("^/", ""))
        .build();

    LOG.debug("subscription URI: {}", sub_uri);
    return this.aeron.addSubscription(sub_uri, ECHO_STREAM_ID);
  }

If, for this example, we assume a client at 10.10.1.100 using a local port 8000, the resulting channel URI will look something like:

aeron:udp?endpoint=10.10.1.100:8000|reliable=true

We then create a publication that will be used to send messages to the server. The procedure for creating the publication is very similar to that of the subscription, so the explanation won’t be repeated here.

EchoClient.setupPublication()

  private Publication setupPublication()
  {
    final String pub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(this.remote_address.toString().replaceFirst("^/", ""))
        .build();

    LOG.debug("publication URI: {}", pub_uri);
    return this.aeron.addPublication(pub_uri, ECHO_STREAM_ID);
  }

If, for this example, we assume a server at 10.10.1.200 using a local port 9000, the resulting channel URI will look something like:

aeron:udp?endpoint=10.10.1.200:9000|reliable=true

We now define a runLoop method that takes a created publication and subscription and simply loops forever, sending and receiving messages.

EchoClient.runLoop()

  private void runLoop(
    final Subscription sub,
    final Publication pub)
    throws InterruptedException
  {
    final UnsafeBuffer buffer =
      new UnsafeBuffer(BufferUtil.allocateDirectAligned(2048, 16));

    final Random random = new Random();

    /*
     * Try repeatedly to send an initial HELLO message
     */

    while (true) {
      if (pub.isConnected()) {
        if (sendMessage(pub, buffer, "HELLO " + this.local_address.getPort())) {
          break;
        }
      }

      Thread.sleep(1000L);
    }

    /*
     * Send an infinite stream of random unsigned integers.
     */

    final FragmentHandler assembler =
      new FragmentAssembler(EchoClient::onParseMessage);

    while (true) {
      if (pub.isConnected()) {
        sendMessage(pub, buffer, Integer.toUnsignedString(random.nextInt()));
      }
      if (sub.isConnected()) {
        sub.poll(assembler, 10);
      }
      Thread.sleep(1000L);
    }
  }

The method first creates a buffer that will be used to store incoming and outgoing data. Some of the ways in which Aeron achieves a very high degree of performance include using native memory, allocating memory up-front in order to avoid producing garbage during steady-state execution, and eliminating buffer copying as much as is possible on the JVM. In order to assist with this, we allocate a 2KiB direct byte buffer (with 16 byte alignment) to store messages, and use the UnsafeBuffer class from Aeron’s associated data structure package Agrona to get very high-performance (unsafe) memory operations on the given buffer.

The method then sends a string HELLO <port> where <port> is replaced with the port number used to create the subscription earlier. The server is required to address responses to this port and so those will be made available on the subscription that the client opened.

The method then loops forever, polling the subscription for new messages, and sending a random integer string to the publication, waiting for a second at each iteration.

The sendMessage method is a more-or-less uninteresting utility method that simply packs the given string into the buffer we allocated at the start of the method. It does not do any particular error handling: Message sending can fail for the reasons given in the documentation for the Publication.offer() method, and real applications should do the appropriate error handling 3. Our implementation simply tries to write a message, retrying up to five times in total, before giving up. The method returns true if the message was sent, and logs an error message and returns false otherwise. Better approaches for real applications are discussed later.

EchoClient.sendMessage()

  private static boolean sendMessage(
    final Publication pub,
    final UnsafeBuffer buffer,
    final String text)
  {
    LOG.debug("send: {}", text);

    final byte[] value = text.getBytes(UTF_8);
    buffer.putBytes(0, value);

    long result = 0L;
    for (int index = 0; index < 5; ++index) {
      result = pub.offer(buffer, 0, text.length());
      if (result < 0L) {
        try {
          Thread.sleep(100L);
        } catch (final InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        continue;
      }
      return true;
    }

    LOG.error("could not send: {}", Long.valueOf(result));
    return false;
  }

The poll method, defined on the Subscription type, takes a function of type FragmentHandler as an argument. In our case, we pass an Aeron-provided implementation of the FragmentHandler type called FragmentAssembler. The FragmentAssembler class handles the reassembly of fragmented messages and then passes the assembled messages to our own FragmentHandler (in this case, the onParseMessage method). Our code is extremely unlikely to ever send or receive a message that could have exceeded the MTU of the underlying UDP transport. If it does happen, the FragmentAssembler takes care of it.

EchoClient.onParseMessage()

  private static void onParseMessage(
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
  {
    final byte[] buf = new byte[length];
    buffer.getBytes(offset, buf);
    final String response = new String(buf, UTF_8);
    LOG.debug("response: {}", response);
  }

Now that we have all of the required pieces, the run method is trivial:

EchoClient.run()

  public void run()
    throws Exception
  {
    try (final Subscription sub = this.setupSubscription()) {
      try (final Publication pub = this.setupPublication()) {
        this.runLoop(sub, pub);
      }
    }
  }

This is the bare minimum that is required to have a working client. For ease of testing, a simple main method can be defined that takes command line arguments:

EchoClient.main()

  public static void main(
    final String[] args)
    throws Exception
  {
    if (args.length < 5) {
      LOG.error("usage: directory local-address local-port remote-address remote-port");
      System.exit(1);
    }

    final Path directory = Paths.get(args[0]);
    final InetAddress local_name = InetAddress.getByName(args[1]);
    final Integer local_port = Integer.valueOf(args[2]);
    final InetAddress remote_name = InetAddress.getByName(args[3]);
    final Integer remote_port = Integer.valueOf(args[4]);

    final InetSocketAddress local_address =
      new InetSocketAddress(local_name, local_port.intValue());
    final InetSocketAddress remote_address =
      new InetSocketAddress(remote_name, remote_port.intValue());

    try (final EchoClient client = create(directory, local_address, remote_address)) {
      client.run();
    }
  }

3.2 Echo Server

The design of the server is not radically different to that of the client. The main differences are in when and where publications and subscriptions are created. The client simply opens a publication/subscription pair to the only peer with which it will communicate (the server) and maintains them until the user has had enough. The server, however, needs to create publications and/or subscriptions in response to clients connecting, and needs to be able to address clients individually in order to send responses.

We structure the EchoServer server similarly to the EchoClient including the static create method that sets up Aeron and the media driver. The only difference is that the server does not have a remote address as part of its configuration information; it only specifies a local address to which clients will connect.

The run method, however, is different in several aspects.

EchoServer.setupSubscription()

  private Subscription setupSubscription()
  {
    final String sub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(this.local_address.toString().replaceFirst("^/", ""))
        .build();

    LOG.debug("subscription URI: {}", sub_uri);
    return this.aeron.addSubscription(
      sub_uri,
      ECHO_STREAM_ID,
      this::onClientConnected,
      this::onClientDisconnected);
  }

The subscription configured by the server is augmented with a pair of image handlers. An image, in Aeron terminology, is the replication of a publication stream on the subscription side. In other words, when a client creates a publication to talk to the server, the server obtains an image of the client’s publication that contains a subscription from which the server can read. When the client writes a message to its publication, the server can read a message from the subscription in the image.

When an image becomes available, this is our indication that a client has connected. When an image becomes unavailable, this is our indication that a client has disconnected.

We provide the subscription with pair of method references, this::onClientConnected and this::onClientDisconnected, that will be called when an image becomes available and unavailable, respectively.

EchoServer.onClientConnected()

  private void onClientConnected(
    final Image image)
  {
    final int session = image.sessionId();
    LOG.debug("onClientConnected: {}", image.sourceIdentity());

    this.clients.put(
      Integer.valueOf(session),
      new ServerClient(session, image, this.aeron));
  }

When an image becomes available, we take note of the session ID of the image. This can be effectively used to uniquely identify a client with respect to that particular subscription. We create a new instance of a ServerClient class used to store per-client state on the server, and store it in map associating session IDs with ServerClient instances. The details of the ServerClient class will be discussed shortly.

Similarly, in the onClientDisconnected method, we find the client that appears to be disconnecting using the session ID of the image, call the close method on the corresponding ServerClient instance, assuming that one exists, and remove ServerClient instance from the table of clients.

EchoServer.onClientDisconnected()

  private void onClientDisconnected(
    final Image image)
  {
    final int session = image.sessionId();
    LOG.debug("onClientDisconnected: {}", image.sourceIdentity());

    try (final ServerClient client = this.clients.remove(Integer.valueOf(session))) {
      LOG.debug("onClientDisconnected: closing client {}", client);
    } catch (final Exception e) {
      LOG.error("onClientDisconnected: failed to close client: ", e);
    }
  }

The server does not create a publication in the run method as the client did: It defers the creation of publications until clients have connected for reasons that will be discussed shortly.

The complete run method, therefore, looks like this:

EchoServer.run()

  public void run()
    throws Exception
  {
    try (final Subscription sub = this.setupSubscription()) {
      this.runLoop(sub);
    }
  }

The runLoop method on the server is simplified when compared to the analogous method on the client. The method simply polls the main subscription repeatedly:

EchoServer.runLoop()

  private void runLoop(
    final Subscription sub)
    throws InterruptedException
  {
    final FragmentHandler assembler =
      new FragmentAssembler(this::onParseMessage);

    while (true) {
      if (sub.isConnected()) {
        sub.poll(assembler, 10);
      }

      Thread.sleep(100L);
    }
  }

The main difference is the work that now takes place in the onParseMessage method:

EchoServer.onParseMessage()

  private void onParseMessage(
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
  {
    final int session = header.sessionId();

    final ServerClient client = this.clients.get(Integer.valueOf(session));
    if (client == null) {
      LOG.warn(
        "received message from unknown client: {}",
        Integer.valueOf(session));
      return;
    }

    final byte[] buf = new byte[length];
    buffer.getBytes(offset, buf);
    final String message = new String(buf, UTF_8);
    client.onReceiveMessage(message);
  }

We first take the session ID provided to us by the Header value passed to us by Aeron. The session ID is used to look up a client in the table of clients populated by the onClientConnected method. Assuming that a client actually exists with a matching session ID, a UTF-8 string is decoded from the buffer as it is in the EchoClient implementation, but the decoded string is then given to the corresponding ServerClient instance to be processed via its onReceiveMessage method.

Due to the small size of the ServerClient class (it effectively only contains a single method that does interesting work), the code is published here in its entirety:

EchoServer.ServerClient

  private static final class ServerClient implements AutoCloseable
  {
    private static final Pattern HELLO_PATTERN =
      Pattern.compile("HELLO ([0-9]+)");

    private enum State
    {
      INITIAL,
      CONNECTED
    }

    private final int session;
    private final Image image;
    private final Aeron aeron;
    private State state;
    private final UnsafeBuffer buffer;
    private Publication publication;

    ServerClient(
      final int session,
      final Image in_image,
      final Aeron in_aeron)
    {
      this.session = session;
      this.image = Objects.requireNonNull(in_image, "image");
      this.aeron = Objects.requireNonNull(in_aeron, "aeron");
      this.state = State.INITIAL;
      this.buffer = new UnsafeBuffer(
        BufferUtil.allocateDirectAligned(2048, 16));
    }

    @Override
    public void close()
      throws Exception
    {
      closeIfNotNull(this.publication);
    }

    public void onReceiveMessage(
      final String message)
    {
      Objects.requireNonNull(message, "message");

      LOG.debug(
        "receive [0x{}]: {}",
        Integer.toUnsignedString(this.session),
        message);

      switch (this.state) {
        case INITIAL: {
          this.onReceiveMessageInitial(message);
          break;
        }
        case CONNECTED: {
          sendMessage(this.publication, this.buffer, message);
          break;
        }
      }
    }

    private void onReceiveMessageInitial(
      final String message)
    {
      final Matcher matcher = HELLO_PATTERN.matcher(message);
      if (!matcher.matches()) {
        LOG.warn("client sent malformed HELLO message: {}", message);
        return;
      }

      final int port =
        Integer.parseUnsignedInt(matcher.group(1));
      final String source_id =
        this.image.sourceIdentity();

      try {
        final URI source_uri =
          new URI("fake://" + source_id);

        final String address =
          new StringBuilder(64)
            .append(source_uri.getHost())
            .append(":")
            .append(port)
            .toString();

        final String pub_uri =
          new ChannelUriStringBuilder()
            .reliable(TRUE)
            .media("udp")
            .endpoint(address)
            .build();

        this.publication =
          this.aeron.addPublication(pub_uri, ECHO_STREAM_ID);

        this.state = State.CONNECTED;
      } catch (final URISyntaxException e) {
        LOG.warn("client sent malformed HELLO message: {}: ", message, e);
      }
    }
  }

The ServerClient class maintains a State field which may either be CONNECTED or INITIAL. The client begins in the INITIAL state and then transitions to the CONNECTED state after successfully processing the HELLO string that is expected to be sent by connecting clients as their first message. The onReceiveMessage method checks to see if the client is in the INITIAL state or the CONNECTED state. If the client is in the INITIAL state, the message is passed to the onReceiveMessageInitial method. This method parses what it assumes will be a HELLO string and constructs a new publication that will be used to send messages back to the client. Aeron provides us with both the source address of the client and the ephemeral port the client used to send the message we just received via the Image.sourceIdentity() method. However, we cannot send messages back to the ephemeral port the client used: We need to send messages to the port the client specified in the HELLO message so that they are readable via the subscription the client created in EchoClient.setupSubscription() for that purpose.

When using UDP as a transport, the result of the sourceIdentity() call will be a string of the form ip-address:port. For a client at 10.10.1.200 using an arbitrary high-numbered ephemeral UDP port, the string may look something like 10.10.1.200:53618. The simplest way to parse a string of this form is to simply delegate parsing to the standard java.net.URI class. We do this by constructing a URI containing the original address and port 4, extracting the IP address from the resulting URI value, substituting the port specified by the client in the HELLO string, and then opening a new publication in a way that should now be familiar.

Assuming that all of this proceeds without issue, the client is moved to the CONNECTED state and the method returns. From that point on, any message received by that particular client instance will be sent back to the client via the newly created publication.

At this point, we appear to have a working client and server. An additional main method is added to the server to help with testing from the command line:

EchoServer.main()

  public static void main(
    final String[] args)
    throws Exception
  {
    if (args.length < 3) {
      LOG.error("usage: directory local-address local-port");
      System.exit(1);
    }

    final Path directory = Paths.get(args[0]);
    final InetAddress local_name = InetAddress.getByName(args[1]);
    final Integer local_port = Integer.valueOf(args[2]);

    final InetSocketAddress local_address =
      new InetSocketAddress(local_name, local_port.intValue());

    try (final EchoServer server = create(directory, local_address)) {
      server.run();
    }
  }

Executing the server and a client from the command line produces the expected output:

server$ java -classpath target/com.io7m.aeron-guide-0.0.1.jar com.io7m.aeron_guide.take1.EchoServer /tmp/aeron-server 10.10.1.100 9000
20:26:47.070 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - subscription URI: aeron:udp?endpoint=10.10.1.100:9000|reliable=true
20:28:09.981 [aeron-client-conductor] DEBUG com.io7m.aeron_guide.take1.EchoServer - onClientConnected: 10.10.1.100:44501
20:28:10.988 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - receive [0x896291375]: HELLO 8000
20:28:11.049 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - receive [0x896291375]: 2745822766
20:28:11.050 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - send: [session 0x562238613] 2745822766
20:28:11.953 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - receive [0x896291375]: 1016181810
20:28:11.953 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - send: [session 0x562238613] 1016181810
20:28:12.955 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - receive [0x896291375]: 296510575
20:28:12.955 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - send: [session 0x562238613] 296510575
20:28:13.957 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - receive [0x896291375]: 3276793170
20:28:13.957 [main] DEBUG com.io7m.aeron_guide.take1.EchoServer - send: [session 0x562238613] 3276793170

client$ java -classpath target/com.io7m.aeron-guide-0.0.1.jar com.io7m.aeron_guide.take1.EchoClient /tmp/aeron-client0 10.10.1.100 8000 10.10.1.100 9000
20:28:09.826 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - subscription URI: aeron:udp?endpoint=10.10.1.100:8000|reliable=true
20:28:09.846 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - publication URI: aeron:udp?endpoint=10.10.1.100:9000|reliable=true
20:28:10.926 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - send: HELLO 8000
20:28:10.927 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - send: 2745822766
20:28:11.928 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - send: 1016181810
20:28:11.933 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - response: 2745822766
20:28:12.934 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - send: 296510575
20:28:12.934 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - response: 1016181810
20:28:13.935 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - send: 3276793170
20:28:13.935 [main] DEBUG com.io7m.aeron_guide.take1.EchoClient - response: 296510575

The implementation works, but suffers from a number of weaknesses ranging from benign to potentially fatal.

3.3 Implementation Weaknesses

3.3.1 EchoClient Server Disconnections

The EchoClient implementation does not handle the case of the server either disconnecting or not existing in the first place. It will simply try to send messages forever and will ignore the fact that it never gets a response. Handling this was deliberately left out of the implementation for the sake of keeping the code as simple as possible. The way to fix this is twofold.

Firstly, the client can simply give up trying to send the initial HELLO message if no response has been received after a reasonable amount of time has elapsed. This solution can work well for protocols where the first message in the conversation is expected to be sent by the client. Aeron also indicates when a Publication is no longer connected by returning NOT_CONNECTED or CLOSED when calling offer() on the Publication. Real applications can react accordingly rather than just logging a failure message (like the EchoClient and EchoServer) and continuing.

Secondly, the client can specify image handlers on the subscription it creates in the same manner as the server. When an image becomes available, that means that the server has sent a message and is therefore presumably alive and willing to talk to the client. When an image becomes unavailable, the server is no longer willing or able to talk to the client. This can work well for protocols where the first message in the conversation is expected to be sent by the server.

3.3.2 MTU Handling Is Implicit

Sending messages over the open internet imposes an upper bound on the MTU that an application can use for individual messages.

As a general rule of thumb, the MTU for UDP packets sent over the open internet should be <= 1200 bytes. Aeron adds the further restriction that MTU values must be a multiple of 32.

The EchoClient and EchoServer implementations use UTF-8 strings that are not expected to be longer than about 16 bytes, and so assume that fragmentation will never occur and make no attempt to avoid it.

3.3.3 Clients Sending Bad Messages Are Not Killed

The EchoServer implementation has a rather serious failing in that if the first message received by a client is not parseable as a simple HELLO <port> string, the offending client will never be told about this and the server will continue to process every subsequent message from the client as if it was an unparseable HELLO string.

This is partly a result of an underspecification of the protocol: The server has no way to tell a client that a fatal error has occurred and that the client should go away (or at least retry the HELLO string). This is also an implementation issue: The server has no means to forcibly disconnect a client (and as discussed previouly, the client would not notice that it had been disconnected anyway).

3.3.4 Message Sending Is Not Robust

The way that messages are sent is insufficient in the sense that a failure to send a message is not a hard error. Real applications must be prepared to queue and retry messages as necessary, and should raise exceptions if messages absolutely cannot be sent after a reasonable number of attempts.

See Handling Back Pressure for details.

3.3.5 Work Takes Place On Aeron Threads

Currently, all work takes place on threads controlled by Aeron. As per the documentation, Publication values are thread-safe, and Subscription values are not. Real applications should expect to take messages from a Subscription and place them into a queue for processing by one or more application threads 5 to avoid blocking the Aeron conductor threads. See Thread Utilisation for details.

3.3.6 EchoClient Cannot Be Behind NAT

This is the most serious issue with the implementation described so far (and astute programmers familiar with UDP networking will already have noticed): The implementation is fundamentally incompatible with Network Address Translation.

The server must be able to open connections directly to clients, and this is something that is not possible without clients enabling port forwarding on the NAT routers that they are inevitably sitting behind. The same is true of connections opened to the server by the clients, but this is less of an issue in that server operators are used to routinely enabling port forwarding on their routers to allow clients to connect in. For a multiplayer game with non-technical players running clients on the open internet, requring each client to enable port forwarding just to be able to connect to a server would be unacceptable.

Additionally, having clients specify port information in an application-level protocol is distasteful. Anyone familiar with UDP programming can open a socket, bind it, and then read and write datagrams without even thinking about NAT: Routers will statefully match inbound datagrams to previously sent outbound datagrams and allow them to pass through unheeded. It seems unpleasant that Aeron would require us to give up this essentially OS-level functionality. Thankfully, Aeron includes a somewhat sparsely documented feature known as multi-destination-cast that can be used to traverse NAT systems reliably, removing the requirement for servers to connect directly back to clients.

4 A Client And Server (Take 2)

The serious issue that needs to be fixed with our implementation is the lack of ability to function with clients behind NAT. In order to handle this, we need to use a feature of Aeron known as multi-destination-cast (MDC). At first glance, this would appear to have nothing whatsoever to do with NAT: Essentially, it is a means to broadcast messages to a set of clients using only unicast UDP (as opposed to multicast UDP, which may or may not be available on the open internet). However, as will now be demonstrated, we can actually use MDC to get reliable NAT traversal.

4.1 Multi-Destination-Cast

To understand how multi-destination-cast solves our problem, consider what happens with a traditional non-Aeron program that uses UDP sockets to send datagrams directly:

NAT With Datagrams

The client machine maize has a private address 10.10.1.100 and sits behind a stateful packet filter and NAT router tomato. tomato has a private address of 10.10.1.1 and a public address 1.2.3.4. The server apricot has a public address of 5.6.7.8. The operating system on tomato is performing unidirectional NAT: The source addresses of packets sent outbound (from left to right on the diagram) from the private network 10.10.* will be rewritten so that they appear to be coming from the public address 1.2.3.4.

  1. maize opens a UDP socket and binds it to 10.10.1.100:9000, and then uses the socket to send a datagram p0 to port 8000 on the server apricot.

  2. The datagram p0 reaches tomato, which dutifully performs network address translation on the packet. The source address of p0 is rewritten to the public address of tomato and the source port is changed to an arbitrary high-numbered unused port, resulting in a packet rewrite(p0) with source 1.2.3.4:58138. The operating system on tomato records the fact that this rewrite took place, and then sends rewrite(p0) on its way to apricot.

  3. apricot receives rewrite(p0) and accepts it. A short time later, it sends a response packet p1 back to the source of rewrite(p0).

  4. The packet p1 reaches tomato and is inspected. The operating system on tomato sees that the packet has come from 5.6.7.8:8000 and is destined for 1.2.3.4:58138, and notices that it recently saw an outgoing packet that was destined for 5.6.7.8:8000 and that it rewrote the source address to 1.2.3.4:58138. It makes the assumption that this must be a packet sent in response to the packet it rewrote earlier, and so it rewrites the destination of p1 to be equal to the original source of p0 - 10.10.1.100:9000 - and sends it onward.

  5. The rewritten packet, rewrite(p1), is sent on its way and makes it safely to maize.

UDP is a connectionless protocol, but it’s possible treat a stream of datagrams as a “connection” by keeping track of the source and destination addresses, and by remembering the recent address rewrites. This is standard practice on all modern NAT implementations and means that the vast majority of UDP applications can work correctly when the clients are behind NAT.

The reason that our original EchoClient and EchoServer implementation cannot work behind NAT is because first the client sends a datagram d0 to the server and then, based on the data in the packet, the server then tries to send a new and completely unrelated datagram d1 back to the port that the client specified. The NAT router obviously knows nothing of the application-level protocol between the client and server and so it can’t know about the port agreed upon by the client and server. It will not have a record of any recent rewrites that occurred for the port given in d1 (because none have occurred), and so the packet will simply be dropped when it reaches the router. As mentioned previously, streams in Aeron are strictly unidirectional and so we cannot construct an an object (like a traditional UDP socket) that has both a single address and port, and that can both receive and send messages.

Aeron’s multi-destination-cast feature allows subscribers to add themselves as destinations to a publication in a manner that will work correctly when the subscribers are behind NAT. Essentially, a publication can declare an explicit control port that clients may use to register themselves as destinations for the publication. The publication will send data to all of the subscribers that have registered, and the underlying protocol for this is implemented such that the datagrams containing the messages written to the publication will be addressed to the source adresses and ports of the subscribers. This allows the data packets to be treated as ordinary response packets by NAT systems and, as such, they will successfully reach subscribers without issue.

Briefly, a server at 5.6.7.8 could make these two calls:

// aeron:udp?endpoint=5.6.7.8:9000|control=5.6.7.8:9001|control-mode=dynamic|reliable=true

final ConcurrentPublication server_pub =
  aeron.addPublication(
    new ChannelUriStringBuilder()
      .media("udp")
      .reliable(TRUE)
      .controlEndpoint("5.6.7.8:9001")
      .controlMode("dynamic")
      .endpoint("5.6.7.8:9000")
      .build(),
    SOME_STREAM_ID);

// aeron:udp?endpoint=5.6.7.8:9000|reliable=true

final Subscription server_sub =
  aeron.addSubscription(
    new ChannelUriStringBuilder()
      .media("udp")
      .reliable(TRUE)
      .endpoint("5.6.7.8:9000")
      .build(),
    SOME_STREAM_ID,
    image -> LOG.debug("server: a client has connected"),
    image -> LOG.debug("server: a client has disconnected"));

The first call creates a publication that listens for subscriptions on 5.6.7.8:9000 and also creates a control port on 5.6.7.8:9001. It specifies that it wants dynamic MDC, which means that clients will be automatically added to the publication as destinations. In contrast, specifying manual mode here means that clients would need to be explicitly added by calling Publication.addDestination(). The created publication is used to send messages to all clients. Note that we specifically mean all clients; we have no way to address clients individually. This limitation will be addressed shortly.

The second call creates a subscription on 5.6.7.8:9000 that will be used to accept messages from clients. Note that although the publication and subscription both share the same 5.6.7.8:9000 endpoint, they are completely distinct streams: Messages written to server_pub will not be accessible from server_sub.

In both cases, the server uses an arbitrary application-specific stream ID named SOME_STREAM_ID. The client will need to use the same value.

Then, a client at 10.10.1.100 could make the following calls:

// aeron:udp?endpoint=5.6.7.8:9000|reliable=true

final ConcurrentPublication client_pub =
  aeron.addPublication(
    new ChannelUriStringBuilder()
      .media("udp")
      .reliable(TRUE)
      .endpoint("5.6.7.8:9000")
      .build(),
    SOME_STREAM_ID);

// aeron:udp?endpoint=10.10.1.100:8000|control=5.6.7.8:9001|control-mode=dynamic|reliable=true

final Subscription client_sub =
  aeron.addSubscription(
    new ChannelUriStringBuilder()
      .media("udp")
      .reliable(TRUE)
      .controlEndpoint("5.6.7.8:9001")
      .controlMode("dynamic")
      .endpoint("10.10.1.100:8000")
      .build(),
    SOME_STREAM_ID,
    image -> LOG.debug("client: connected to server"),
    image -> LOG.debug("client: disconnected from server"));

The first call creates a publication that will send messages to the server at 5.6.7.8:9000.

The second call creates a subscription that will create a local socket at 10.10.1.100:8000 that will be used to receive messages from the server, and will register itself dynamically as a destination by contacting the control socket on the server at 5.6.7.8:9001. If no local socket is specified here, the implementation will use an arbitrary high-numbered ephemeral port on whatever local IP address it thinks is appropriate.

As mentioned, one serious issue with the above is that the server only has a single publication that will send messages to all clients. This is less than useful if the requirement is to implement a protocol like our echo protocol where each client is supposed to have its own stream of messages and responses. The current solution to this in Aeron is to simply create multiple publications and then somehow arrange for each new connecting client to subscribe to a fresh publication intended just for that client. We now need to redesign the echo protocol to address this along with all of the other implementation weaknesses discussed so far.

4.2 Security Requirements

We assume a threat model that matches that of the average authoritative server-based (as opposed to peer-to-peer) online multiplayer game as of 2018. That is:

  • Clients speak directly to the server and never with each other. Messages from anything other than the server’s address must be ignored.

  • Client code is completely untrusted; clients may be running (possibly maliciously) modified code. We must assume that clients will send invalid messages to the server.

  • Clients can spoof packets such that their content can be arbitrary data and may appear to be coming from any address.

  • Clients cannot read packets sent by other clients to the server, and cannot read the packets sent by the server to other clients 6.

  • Malicious clients can and will try to take control of the server. Failing that, malicious clients can and will try to perform simple denial of service attacks on the server - just because they can.

4.3 Echo 2.0 Outline

The rough outline for our new protocol is that a client will first connect to the server and introduce itself. The server will then allocate a new publication and subscription pair (that we term as a duologue) and tell the client how to reach them. The client will disconnect and then connect to the new duologue and start sending strings to be echoed as before. We’d like to add a way for the server to signal errors to the client and, given that we need to allocate new channels for each new client, we’d be well-advised to place some sort of configurable limit on the number of clients that can be connected at any given time 7.

The right way to implement this appears to be to have a client introduce itself to the server on an all clients channel, and then have the server redirect that client to a publication/subscription pair allocated for that client alone. The server broadcasts messages on the all clients channel and prefixes messages addressed to a specific client with the session ID of the target client.

When a client c subscribes to the all clients channel, it is assigned a (random) session ID s by Aeron automatically. The server allocates a new duologue q, and broadcasts a message that the client with session s should open a publication and subscription to the port number of q. The client then unsubscribes from the all clients channel and opens a publication and subscription to the port number of q. The client and server can then exchange messages on a one-to-one basis via q, with the server deleting q when the client disconnects.

Of course, a malicious client could simply create lots of subscriptions to the all clients channel and keep requesting new duologues until the server either runs out of available UDP ports or the maximum number of clients for the server is reached. It’s therefore necessary to simply refuse to create more than n new duologues for a client at any given IP address. Of course, this raises a further issue: What if a client c spoofs an endless stream of requests to create new duologues but does so by spoofing the IP address of another client d in the requests? The server will wrongfully punish d for the requests, and will in fact stop d from being able to request its own duologues.

To combat both of these issues, the server will expire a duologue if no client has connected to it within a configurable time limit. Of course, a malicious client could try to request the maximum number of duologues by spoofing requests and impersonating d, and could then spoof further packets, connect to those duologues, and prevent them from expiring. This would essentially lock d out of the server for as long as the server continues running. This is difficult but not impossible for a determined attacker to achieve. To combat this, the server needs to make it difficult for anyone other than a well-behaved client to actually connect to a requested duologue. Essentially, the server must share a secret with a requesting client, and that client must then present that same secret to the server when connecting to the duologue.

The solution to this is fairly simple. The client sends a 4-byte one-time pad to the server when it connects to the all clients channel. By our intended threat model, only the server is capable of reading this one-time pad. When the server creates a new duologue, it chooses the session ID for the duologue ahead of time (rather than allowing Aeron to pick one itself), encrypts this session ID with the client’s one-time pad, and sends the encrypted ID as part of the response given to the client when directing it to the new duologue. The client decrypts the session ID and then uses it when connecting to the duologue. Aeron will automatically reject 8 any client that tries to connect with an incorrect session ID; it’s not necessary to deal with invalid sessions at the application level.

In effect, this means that a malicious client that is trying to interfere with the ability of another client to connect must:

  1. Listen for messages in the all clients channel indicating that the server has just created a new duologue.

  2. Somehow guess the source IP address of the client that created the duologue. In the case of IPv4, this would involve guessing the right address from slightly more than 2 ^ 31 addresses.

  3. Somehow guess the required session ID. Identifiers are randomly selected from a pool of 2 ^ 31 possible values.

  4. Spoof enough packets implementing the underlying Aeron protocol to set up the required publication to the duologue, including the hard-to-guess session ID, and then spoof more packets to keep the connection alive.

This would appear to be next to impossible to achieve reliably. It would be easier for the attacker to simply attempt some sort of distributed denial-of-service attack (against which there is obviously no protocol-level protection possible).

4.4 Echo 2.0 Specification

All messages are UTF-8 encoded strings.

The client:

  1. Must open a publication p and a subscription s to the server, using dynamic MDC on the subscription, and must send the string HELLO <key> where <key> is a random unsigned 32-bit hexadecimal value 9 encoded as a string.

  2. Must wait until the server sends a response string of either the form <session> CONNECT <port> <control-port> <encrypted-session> or <session> ERROR <message>.

  • If the response is of the form <session> ERROR ..., but <session> does not match the client’s current session ID, the response should be ignored and the client should continue waiting.
  • If the response is of the form <session> ERROR <message> and <session> matches the the client’s current session ID, the client must assume that <message> is an informative error message, log the message, and exit.
  • If the response is of the form <session> CONNECT ..., but <session> does not match the client’s current session ID, the response should be ignored and the client should continue waiting.
  • If the response is of the form <session> CONNECT <port> <control-port> <encrypted-session> and <session> matches the client’s current session ID:
    • The client must decrypt <encrypted-session> by evaluating k = <encrypted-session> ^ <key>, where <key> is the value from step 1.
    • The client must create a subscription t to port <port> on the server, using <control-port> for MDC and with an explicit session ID k.
    • The client must create a publication u to port <port> on the server with an explicit session ID k.
    • The client must close s and p, and progress to step 3.
  • If the server fails to return an acceptable response in a reasonable amount of time, the client must log an error message and exit.
  1. Must send messages of the form ECHO <message> to u, listening for responses on t.
  • If the response is of the form ERROR <message>, the client should assume that <message> is an informative error message, log the message, and exit.
  • If the response is of the form ECHO <message>, the client should assume that <message> is one of the strings it previously sent, and return to step 3.

The server:

  1. Must open a publication p that will be used to send initial messages to clients when they connect. It must provide a control port so that clients can use dynamic MDC.

  2. Must create a subscription s that will be used to receive initial messages from clients.

The server stores an initially empty list of duologues. A duologue is a structure comprising the following state:

record Duologue(
  InetAddress owner,
  int session_id,
  Publication pub,
  Subscription sub);

When a message is received on s:

  • If the message is of the form HELLO <key>
    • If the size of the list of duologues is n:
      • The server must write a message of the form <session-id> ERROR server full to p, where <session-id> is the session ID of the client that sent the message, and return to waiting for messages.
    • If there are at least m existing duologues owned by the IP address of the client that sent the message, where m is a configurable value:
      • The server must write a message of the form <session-id> ERROR too many connections for IP address to p, where <session-id> is the session ID of the client that sent the message, and return to waiting for messages.
    • Otherwise:
      • The server must record (a, z, t, u) into the list of duologues, where t and u are a freshly allocated publication and subscription, respectively, a is the IP address of the client, and z is a freshly allocated session ID.
      • The server must write a message <session-id> CONNECT <port> <control-port> <encrypted-session> to p, where <port> and <control-port> are the port numbers of t and u, <encrypted-session> is z ^ <key>, and <session-id> is the session ID of the client that sent the message.
      • The server must return to waiting for messages.

When message is received on the subscription u of a duologue i:

  • If the message is of the form ECHO <message>
    • The server must write a message of the form ECHO <message> to t.
  • Otherwise:
    • The server must write a message of the form ERROR bad message to t.
    • The server must close t and u.
    • The server must delete the duologue.

Finally, the server should delete duologues that have no subscribers after the arbitrary configurable time limit has expired.

4.5 Echo 2.0 Server

We start by examining the server implementation.

The create method is mostly unchanged. The main difference is that we tell the media driver that we are going to be manually assigning session IDs in the range [EchoSessions.RESERVED_SESSION_ID_LOW, EchoSessions.RESERVED_SESSION_ID_HIGH] ourselves. When the media driver automatically assigns session IDs, it must use values outside of this range to avoid conflict with any that we assign ourselves. We also initialize a value of type EchoServerExecutorService.

EchoServer.create()

  public static EchoServer create(
    final Clock clock,
    final EchoServerConfiguration configuration)
    throws EchoServerException
  {
    Objects.requireNonNull(clock, "clock");
    Objects.requireNonNull(configuration, "configuration");

    final String directory =
      configuration.baseDirectory().toAbsolutePath().toString();

    final MediaDriver.Context media_context =
      new MediaDriver.Context()
        .dirDeleteOnStart(true)
        .publicationReservedSessionIdLow(EchoSessions.RESERVED_SESSION_ID_LOW)
        .publicationReservedSessionIdHigh(EchoSessions.RESERVED_SESSION_ID_HIGH)
        .aeronDirectoryName(directory);

    final Aeron.Context aeron_context =
      new Aeron.Context()
        .aeronDirectoryName(directory);

    EchoServerExecutorService exec = null;
    try {
      exec = EchoServerExecutor.create();

      MediaDriver media_driver = null;
      try {
        media_driver = MediaDriver.launch(media_context);

        Aeron aeron = null;
        try {
          aeron = Aeron.connect(aeron_context);
        } catch (final Exception e) {
          closeIfNotNull(aeron);
          throw e;
        }

        return new EchoServer(clock, exec, media_driver, aeron, configuration);
      } catch (final Exception e) {
        closeIfNotNull(media_driver);
        throw e;
      }
    } catch (final Exception e) {
      try {
        closeIfNotNull(exec);
      } catch (final Exception c_ex) {
        e.addSuppressed(c_ex);
      }
      throw new EchoServerCreationException(e);
    }
  }

As can be seen, the multiple parameters that were passed to the create() method in the original implementation have been moved into an immutable EchoServerConfiguration type. The implementation of this type is generated automatically by the immutables.org annotation processor.

EchoServerConfiguration

public interface EchoServerConfiguration
{
  /**
   * @return The base directory that will be used for the server; should be unique for each server instance
   */

  @Value.Parameter
  Path baseDirectory();

  /**
   * @return The local address to which the server will bind
   */

  @Value.Parameter
  InetAddress localAddress();

  /**
   * @return The port that the server will use for client introductions
   */

  @Value.Parameter
  int localInitialPort();

  /**
   * @return The dynamic MDC control port that the server will use for client introductions
   */

  @Value.Parameter
  int localInitialControlPort();

  /**
   * @return The base port that will be used for individual client duologues
   */

  @Value.Parameter
  int localClientsBasePort();

  /**
   * @return The maximum number of clients that will be allowed on the server
   */

  @Value.Parameter
  int clientMaximumCount();

  /**
   * @return The maximum number of duologues that will be allowed per remote client IP address
   */

  @Value.Parameter
  int maximumConnectionsPerAddress();
}

The EchoServerExecutorService type is effectively a single-threaded java.util.concurrent.Executor with some extra methods added that code can use to check that it is indeed running on the correct thread. The class is used to ensure that long-running work is not performed on threads owned by Aeron (addressing the noted threading issues), and to ensure that access to the server’s internal state is strictly single-threaded. The implementation of this class is fairly unsurprising:

EchoServerExecutorService

public interface EchoServerExecutorService extends AutoCloseable, Executor
{
  /**
   * @return {@code true} if the caller of this method is running on the executor thread
   */

  boolean isExecutorThread();

  /**
   * Raise {@link IllegalStateException} iff {@link #isExecutorThread()} would
   * currently return {@code false}.
   */

  default void assertIsExecutorThread()
  {
    if (!this.isExecutorThread()) {
      throw new IllegalStateException(
        "The current thread is not a server executor thread");
    }
  }
}

EchoServerExecutor

public final class EchoServerExecutor implements EchoServerExecutorService
{
  private static final Logger LOG =
    LoggerFactory.getLogger(EchoServerExecutor.class);

  private final ExecutorService executor;

  private EchoServerExecutor(
    final ExecutorService in_exec)
  {
    this.executor = Objects.requireNonNull(in_exec, "exec");
  }

  @Override
  public boolean isExecutorThread()
  {
    return Thread.currentThread() instanceof EchoServerThread;
  }

  @Override
  public void execute(final Runnable runnable)
  {
    Objects.requireNonNull(runnable, "runnable");

    this.executor.submit(() -> {
      try {
        runnable.run();
      } catch (final Throwable e) {
        LOG.error("uncaught exception: ", e);
      }
    });
  }

  @Override
  public void close()
  {
    this.executor.shutdown();
  }

  private static final class EchoServerThread extends Thread
  {
    EchoServerThread(final Runnable target)
    {
      super(Objects.requireNonNull(target, "target"));
    }
  }

  /**
   * @return A new executor
   */

  public static EchoServerExecutor create()
  {
    final ThreadFactory factory = r -> {
      final EchoServerThread t = new EchoServerThread(r);
      t.setName(new StringBuilder(64)
                  .append("com.io7m.aeron_guide.take2.server[")
                  .append(Long.toUnsignedString(t.getId()))
                  .append("]")
                  .toString());
      return t;
    };

    return new EchoServerExecutor(Executors.newSingleThreadExecutor(factory));
  }
}

We litter the code with calls to assertIsExecutorThread() to help catch thread misuse early.

The majority of the interesting work that the server does is now performed by a static inner class called ClientState. This class is responsible for accepting requests from clients, checking access restrictions (such as enforcing the limit on duologues by a single IP address), polling existing duologues for activity, and so on. We establish a rule that access to the ClientState class is confined to a single thread via the EchoServerExecutor type. The EchoServer class defines three methods that each essentially delegate to the ClientState class:

EchoServer.onInitialClientConnected()

  private void onInitialClientConnected(
    final Image image)
  {
    this.executor.execute(() -> {
      LOG.debug(
        "[{}] initial client connected ({})",
        Integer.toString(image.sessionId()),
        image.sourceIdentity());

      this.clients.onInitialClientConnected(
        image.sessionId(),
        EchoAddresses.extractAddress(image.sourceIdentity()));
    });
  }

EchoServer.onInitialClientDisconnected()

  private void onInitialClientDisconnected(
    final Image image)
  {
    this.executor.execute(() -> {
      LOG.debug(
        "[{}] initial client disconnected ({})",
        Integer.toString(image.sessionId()),
        image.sourceIdentity());

      this.clients.onInitialClientDisconnected(image.sessionId());
    });
  }

EchoServer.onInitialClientMessage()

  private void onInitialClientMessage(
    final Publication publication,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
  {
    final String message =
      EchoMessages.parseMessageUTF8(buffer, offset, length);

    final String session_name =
      Integer.toString(header.sessionId());
    final Integer session_boxed =
      Integer.valueOf(header.sessionId());

    this.executor.execute(() -> {
      try {
        this.clients.onInitialClientMessageProcess(
          publication,
          session_name,
          session_boxed,
          message);
      } catch (final Exception e) {
        LOG.error("could not process client message: ", e);
      }
    });
  }

The onInitialClientConnected() and onInitialClientDisconnected() methods are called as image handlers by the subscription the server configures for the all clients channel. The onInitialClientMessage method is called when a client sends a message on the all clients channel.

The ClientState.onInitialClientConnectedProcess() method does the majority of the work required when a client requests the creation of a new duologue:

EchoServer.ClientState.onInitialClientConnectedProcess()

    void onInitialClientMessageProcess(
      final Publication publication,
      final String session_name,
      final Integer session_boxed,
      final String message)
      throws EchoServerException, IOException
    {
      this.exec.assertIsExecutorThread();

      LOG.debug("[{}] received: {}", session_name, message);

      /*
       * The HELLO command is the only acceptable message from clients
       * on the all-clients channel.
       */

      final Matcher hello_matcher = PATTERN_HELLO.matcher(message);
      if (!hello_matcher.matches()) {
        EchoMessages.sendMessage(
          publication,
          this.send_buffer,
          errorMessage(session_name, "bad message"));
        return;
      }

      /*
       * Check to see if there are already too many clients connected.
       */

      if (this.client_duologues.size() >= this.configuration.clientMaximumCount()) {
        LOG.debug("server is full");
        EchoMessages.sendMessage(
          publication,
          this.send_buffer,
          errorMessage(session_name, "server full"));
        return;
      }

      /*
       * Check to see if this IP address already has the maximum number of
       * duologues allocated to it.
       */

      final InetAddress owner =
        this.client_session_addresses.get(session_boxed);

      if (this.address_counter.countFor(owner) >=
        this.configuration.maximumConnectionsPerAddress()) {
        LOG.debug("too many connections for IP address");
        EchoMessages.sendMessage(
          publication,
          this.send_buffer,
          errorMessage(session_name, "too many connections for IP address"));
        return;
      }

      /*
       * Parse the one-time pad with which the client wants the server to
       * encrypt the identifier of the session that will be created.
       */

      final int duologue_key =
        Integer.parseUnsignedInt(hello_matcher.group(1), 16);

      /*
       * Allocate a new duologue, encrypt the resulting session ID, and send
       * a message to the client telling it where to find the new duologue.
       */

      final EchoServerDuologue duologue =
        this.allocateNewDuologue(session_name, session_boxed, owner);

      final String session_crypt =
        Integer.toUnsignedString(duologue_key ^ duologue.session(), 16)
          .toUpperCase();

      EchoMessages.sendMessage(
        publication,
        this.send_buffer,
        connectMessage(
          session_name,
          duologue.portData(),
          duologue.portControl(),
          session_crypt));
    }

The ClientState.allocateNewDuologue() method does the actual work of duologue allocation by allocating UDP port numbers, a session ID, incrementing the number of duologues assigned to the IP address of the client, and then calling EchoServerDuologue.create() (which does the actual work of creating the underlying Aeron publications and subscriptions:

EchoServer.ClientState.allocateNewDuologue()

    private EchoServerDuologue allocateNewDuologue(
      final String session_name,
      final Integer session_boxed,
      final InetAddress owner)
      throws
      EchoServerPortAllocationException,
      EchoServerSessionAllocationException
    {
      this.address_counter.increment(owner);

      final EchoServerDuologue duologue;
      try {
        final int[] ports = this.port_allocator.allocate(2);
        try {
          final int session = this.session_allocator.allocate();
          try {
            duologue =
              EchoServerDuologue.create(
                this.aeron,
                this.clock,
                this.exec,
                this.configuration.localAddress(),
                owner,
                session,
                ports[0],
                ports[1]);
            LOG.debug("[{}] created new duologue", session_name);
            this.client_duologues.put(session_boxed, duologue);
          } catch (final Exception e) {
            this.session_allocator.free(session);
            throw e;
          }
        } catch (final EchoServerSessionAllocationException e) {
          this.port_allocator.free(ports[0]);
          this.port_allocator.free(ports[1]);
          throw e;
        }
      } catch (final EchoServerPortAllocationException e) {
        this.address_counter.decrement(owner);
        throw e;
      }
      return duologue;
    }

The ClientState.onInitialClientConnected() and ClientState.onInitialClientDisconnected() methods merely record the IP address associated with each session ID for later use in the ClientState.onInitialClientConnectedProcess() method:

EchoServer.ClientState.onInitialClientConnected()

    void onInitialClientConnected(
      final int session_id,
      final InetAddress client_address)
    {
      this.exec.assertIsExecutorThread();

      this.client_session_addresses.put(
        Integer.valueOf(session_id), client_address);
    }

EchoServer.ClientState.onInitialClientDisconnected()

    void onInitialClientDisconnected(
      final int session_id)
    {
      this.exec.assertIsExecutorThread();

      this.client_session_addresses.remove(Integer.valueOf(session_id));
    }

The allocation of UDP port numbers for new duologues is handled by the EchoServerPortAllocator class. This is a simple class that will randomly allocate integer values from a configurable range and will not re-use integer values until they have been explicitly freed. In our implementation, we require 2 UDP ports for every client, have a configurable limit of c clients, and have a configurable base port p. We therefore set the range of ports to [p, p + (2 * c)).

EchoServerPortAllocator

public final class EchoServerPortAllocator
{
  private final int port_lo;
  private final int port_hi;
  private final IntHashSet ports_used;
  private final IntArrayList ports_free;

  /**
   * Create a new port allocator.
   *
   * @param port_base The base port
   * @param max_ports The maximum number of ports that will be allocated
   *
   * @return A new port allocator
   */

  public static EchoServerPortAllocator create(
    final int port_base,
    final int max_ports)
  {
    return new EchoServerPortAllocator(port_base, max_ports);
  }

  private EchoServerPortAllocator(
    final int in_port_lo,
    final int in_max_ports)
  {
    if (in_port_lo <= 0 || in_port_lo >= 65536) {
      throw new IllegalArgumentException(
        String.format(
          "Base port %d must be in the range [1, 65535]",
          Integer.valueOf(in_port_lo)));
    }

    this.port_lo = in_port_lo;
    this.port_hi = in_port_lo + (in_max_ports - 1);

    if (this.port_hi < 0 || this.port_hi >= 65536) {
      throw new IllegalArgumentException(
        String.format(
          "Uppermost port %d must be in the range [1, 65535]",
          Integer.valueOf(this.port_hi)));
    }

    this.ports_used = new IntHashSet(in_max_ports);
    this.ports_free = new IntArrayList();

    for (int port = in_port_lo; port <= this.port_hi; ++port) {
      this.ports_free.addInt(port);
    }
    Collections.shuffle(this.ports_free);
  }

  /**
   * Free a given port. Has no effect if the given port is outside of the range
   * considered by the allocator.
   *
   * @param port The port
   */

  public void free(
    final int port)
  {
    if (port >= this.port_lo && port <= this.port_hi) {
      this.ports_used.remove(port);
      this.ports_free.addInt(port);
    }
  }

  /**
   * Allocate {@code count} ports.
   *
   * @param count The number of ports that will be allocated
   *
   * @return An array of allocated ports
   *
   * @throws EchoServerPortAllocationException If there are fewer than {@code count} ports available to allocate
   */

  public int[] allocate(
    final int count)
    throws EchoServerPortAllocationException
  {
    if (this.ports_free.size() < count) {
      throw new EchoServerPortAllocationException(
        String.format(
          "Too few ports available to allocate %d ports",
          Integer.valueOf(count)));
    }

    final int[] result = new int[count];
    for (int index = 0; index < count; ++index) {
      result[index] = this.ports_free.remove(0).intValue();
      this.ports_used.add(result[index]);
    }

    return result;
  }
}

Similarly, the allocation of session IDs for new duologues is handled by the EchoServerSessionAllocator class 10:

EchoServerSessionAllocator

public final class EchoServerSessionAllocator
{
  private final IntHashSet used;
  private final SecureRandom random;
  private final int min;
  private final int max_count;

  private EchoServerSessionAllocator(
    final int in_min,
    final int in_max,
    final SecureRandom in_random)
  {
    if (in_max < in_min) {
      throw new IllegalArgumentException(
        String.format(
          "Maximum value %d must be >= minimum value %d",
          Integer.valueOf(in_max),
          Integer.valueOf(in_min)));
    }

    this.used = new IntHashSet();
    this.min = in_min;
    this.max_count = Math.max(in_max - in_min, 1);
    this.random = Objects.requireNonNull(in_random, "random");
  }

  /**
   * Create a new session allocator.
   *
   * @param in_min    The minimum session ID (inclusive)
   * @param in_max    The maximum session ID (exclusive)
   * @param in_random A random number generator
   *
   * @return A new allocator
   */

  public static EchoServerSessionAllocator create(
    final int in_min,
    final int in_max,
    final SecureRandom in_random)
  {
    return new EchoServerSessionAllocator(in_min, in_max, in_random);
  }

  /**
   * Allocate a new session.
   *
   * @return A new session ID
   *
   * @throws EchoServerSessionAllocationException If there are no non-allocated sessions left
   */

  public int allocate()
    throws EchoServerSessionAllocationException
  {
    if (this.used.size() == this.max_count) {
      throw new EchoServerSessionAllocationException(
        "No session IDs left to allocate");
    }

    for (int index = 0; index < this.max_count; ++index) {
      final int session = this.random.nextInt(this.max_count) + this.min;
      if (!this.used.contains(session)) {
        this.used.add(session);
        return session;
      }
    }

    throw new EchoServerSessionAllocationException(
      String.format(
        "Unable to allocate a session ID after %d attempts (%d values in use)",
        Integer.valueOf(this.max_count),
        Integer.valueOf(this.used.size()),
        Integer.valueOf(this.max_count)));
  }

  /**
   * Free a session. After this method returns, {@code session} becomes eligible
   * for allocation by future calls to {@link #allocate()}.
   *
   * @param session The session to free
   */

  public void free(final int session)
  {
    this.used.remove(session);
  }
}

The ClientState class is responsible for polling all of the current duologues in order to allow them to process pending messages, and is also responsible for deleting expired duologues:

EchoServer.ClientState.poll()

    public void poll()
    {
      this.exec.assertIsExecutorThread();

      final Iterator<Map.Entry<Integer, EchoServerDuologue>> iter =
        this.client_duologues.entrySet().iterator();

      /*
       * Get the current time; used to expire duologues.
       */

      final Instant now = this.clock.instant();

      while (iter.hasNext()) {
        final Map.Entry<Integer, EchoServerDuologue> entry = iter.next();
        final EchoServerDuologue duologue = entry.getValue();

        final String session_name =
          Integer.toString(entry.getKey().intValue());

        /*
         * If the duologue has either been closed, or has expired, it needs
         * to be deleted.
         */

        boolean delete = false;
        if (duologue.isExpired(now)) {
          LOG.debug("[{}] duologue expired", session_name);
          delete = true;
        }

        if (duologue.isClosed()) {
          LOG.debug("[{}] duologue closed", session_name);
          delete = true;
        }

        if (delete) {
          try {
            duologue.close();
          } finally {
            LOG.debug("[{}] deleted duologue", session_name);
            iter.remove();
            this.port_allocator.free(duologue.portData());
            this.port_allocator.free(duologue.portControl());
            this.address_counter.decrement(duologue.ownerAddress());
          }
          continue;
        }

        /*
         * Otherwise, poll the duologue for activity.
         */

        duologue.poll();
      }
    }

Finally, the actual per-duologue state and behaviour is encapsulated by the EchoServerDuologue class:

EchoServerDuologue

public final class EchoServerDuologue implements AutoCloseable
{
  private static final Logger LOG =
    LoggerFactory.getLogger(EchoServerDuologue.class);

  private static final Pattern PATTERN_ECHO =
    Pattern.compile("^ECHO (.*)$");

  private final UnsafeBuffer send_buffer;
  private final EchoServerExecutorService exec;
  private final Instant initial_expire;
  private final InetAddress owner;
  private final int port_data;
  private final int port_control;
  private final int session;
  private final FragmentAssembler handler;
  private boolean closed;
  private Publication publication;
  private Subscription subscription;

  private EchoServerDuologue(
    final EchoServerExecutorService in_exec,
    final Instant in_initial_expire,
    final InetAddress in_owner_address,
    final int in_session,
    final int in_port_data,
    final int in_port_control)
  {
    this.exec =
      Objects.requireNonNull(in_exec, "executor");
    this.initial_expire =
      Objects.requireNonNull(in_initial_expire, "initial_expire");
    this.owner =
      Objects.requireNonNull(in_owner_address, "owner");

    this.send_buffer =
      new UnsafeBuffer(BufferUtil.allocateDirectAligned(1024, 16));

    this.session = in_session;
    this.port_data = in_port_data;
    this.port_control = in_port_control;
    this.closed = false;

    this.handler = new FragmentAssembler((data, offset, length, header) -> {
      try {
        this.onMessageReceived(data, offset, length, header);
      } catch (final IOException e) {
        LOG.error("failed to send message: ", e);
        this.close();
      }
    });
  }

  /**
   * Create a new duologue. This will create a new publication and subscription
   * pair using a specific session ID and intended only for a single client
   * at a given address.
   *
   * @param aeron         The Aeron instance
   * @param clock         A clock used for time-related operations
   * @param exec          An executor
   * @param local_address The local address of the server ports
   * @param owner_address The address of the client
   * @param session       The session ID
   * @param port_data     The data port
   * @param port_control  The control port
   *
   * @return A new duologue
   */

  public static EchoServerDuologue create(
    final Aeron aeron,
    final Clock clock,
    final EchoServerExecutorService exec,
    final InetAddress local_address,
    final InetAddress owner_address,
    final int session,
    final int port_data,
    final int port_control)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(clock, "clock");
    Objects.requireNonNull(exec, "exec");
    Objects.requireNonNull(local_address, "local_address");
    Objects.requireNonNull(owner_address, "owner_address");

    LOG.debug(
      "creating new duologue at {} ({},{}) session {} for {}",
      local_address,
      Integer.valueOf(port_data),
      Integer.valueOf(port_control),
      Integer.toString(session),
      owner_address);

    final Instant initial_expire =
      clock.instant().plus(10L, ChronoUnit.SECONDS);

    final ConcurrentPublication pub =
      EchoChannels.createPublicationDynamicMDCWithSession(
        aeron,
        local_address,
        port_control,
        EchoServer.ECHO_STREAM_ID,
        session);

    try {
      final EchoServerDuologue duologue =
        new EchoServerDuologue(
          exec,
          initial_expire,
          owner_address,
          session,
          port_data,
          port_control);

      final Subscription sub =
        EchoChannels.createSubscriptionWithHandlersAndSession(
          aeron,
          local_address,
          port_data,
          EchoServer.ECHO_STREAM_ID,
          duologue::onClientConnected,
          duologue::onClientDisconnected,
          session);

      duologue.setPublicationSubscription(pub, sub);
      return duologue;
    } catch (final Exception e) {
      try {
        pub.close();
      } catch (final Exception pe) {
        e.addSuppressed(pe);
      }
      throw e;
    }
  }

  /**
   * Poll the duologue for activity.
   */

  public void poll()
  {
    this.exec.assertIsExecutorThread();
    this.subscription.poll(this.handler, 10);
  }

  private void onMessageReceived(
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
    throws IOException
  {
    this.exec.assertIsExecutorThread();

    final String session_name =
      Integer.toString(header.sessionId());
    final String message =
      EchoMessages.parseMessageUTF8(buffer, offset, length);

    /*
     * Try to parse an ECHO message.
     */

    LOG.debug("[{}] received: {}", session_name, message);
    final Matcher echo_matcher = PATTERN_ECHO.matcher(message);
    if (echo_matcher.matches()) {
      EchoMessages.sendMessage(
        this.publication,
        this.send_buffer,
        "ECHO " + echo_matcher.group(1));
      return;
    }

    /*
     * Otherwise, fail and close this duologue.
     */

    try {
      EchoMessages.sendMessage(
        this.publication,
        this.send_buffer,
        "ERROR bad message");
    } finally {
      this.close();
    }
  }

  private void setPublicationSubscription(
    final Publication in_publication,
    final Subscription in_subscription)
  {
    this.publication =
      Objects.requireNonNull(in_publication, "Publication");
    this.subscription =
      Objects.requireNonNull(in_subscription, "Subscription");
  }

  private void onClientDisconnected(
    final Image image)
  {
    this.exec.execute(() -> {
      final int image_session = image.sessionId();
      final String session_name = Integer.toString(image_session);
      final InetAddress address = EchoAddresses.extractAddress(image.sourceIdentity());

      if (this.subscription.imageCount() == 0) {
        LOG.debug("[{}] last client ({}) disconnected", session_name, address);
        this.close();
      } else {
        LOG.debug("[{}] client {} disconnected", session_name, address);
      }
    });
  }

  private void onClientConnected(
    final Image image)
  {
    this.exec.execute(() -> {
      final InetAddress remote_address =
        EchoAddresses.extractAddress(image.sourceIdentity());

      if (Objects.equals(remote_address, this.owner)) {
        LOG.debug("[{}] client with correct IP connected",
                  Integer.toString(image.sessionId()));
      } else {
        LOG.error("connecting client has wrong address: {}",
                  remote_address);
      }
    });
  }

  /**
   * @param now The current time
   *
   * @return {@code true} if this duologue has no subscribers and the current
   * time {@code now} is after the intended expiry date of the duologue
   */

  public boolean isExpired(
    final Instant now)
  {
    Objects.requireNonNull(now, "now");

    this.exec.assertIsExecutorThread();

    return this.subscription.imageCount() == 0
      && now.isAfter(this.initial_expire);
  }

  /**
   * @return {@code true} iff {@link #close()} has been called
   */

  public boolean isClosed()
  {
    this.exec.assertIsExecutorThread();

    return this.closed;
  }

  @Override
  public void close()
  {
    this.exec.assertIsExecutorThread();

    if (!this.closed) {
      try {
        try {
          this.publication.close();
        } finally {
          this.subscription.close();
        }
      } finally {
        this.closed = true;
      }
    }
  }

  /**
   * @return The data port
   */

  public int portData()
  {
    return this.port_data;
  }

  /**
   * @return The control port
   */

  public int portControl()
  {
    return this.port_control;
  }

  /**
   * @return The IP address that is permitted to participate in this duologue
   */

  public InetAddress ownerAddress()
  {
    return this.owner;
  }

  /**
   * @return The session ID of the duologue
   */

  public int session()
  {
    return this.session;
  }
}

The EchoServerDuologue.onMessageReceived() method raises errors when the client sends a message that the server does not understand, and concludes by closing the publication and subscription. This addresses the noted issues with bad client messages by effectively disconnecting clients that violate the protocol.

The use of dynamic MDC on the all clients channel subscriptions and publications and on the duologue subscriptions and publications ensures that clients can connect to the server and receive responses even when those clients are behind NAT. This addresses the issues noted about NAT.

In order to reduce the amount of duplicate code that results from creating publications and subscriptions, we abstract the calls to create them into their own class, EchoChannels:

EchoChannels

public final class EchoChannels
{
  private EchoChannels()
  {

  }

  /**
   * Create a publication at the given address and port, using the given
   * stream ID.
   *
   * @param aeron     The Aeron instance
   * @param address   The address
   * @param port      The port
   * @param stream_id The stream ID
   *
   * @return A new publication
   */

  public static ConcurrentPublication createPublication(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int stream_id)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String pub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(
          new StringBuilder(64)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .build();

    return aeron.addPublication(pub_uri, stream_id);
  }

  /**
   * Create a subscription with a control port (for dynamic MDC) at the given
   * address and port, using the given stream ID.
   *
   * @param aeron     The Aeron instance
   * @param address   The address
   * @param port      The port
   * @param stream_id The stream ID
   *
   * @return A new publication
   */

  public static Subscription createSubscriptionDynamicMDC(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int stream_id)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String sub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .controlEndpoint(
          new StringBuilder(64)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .controlMode("dynamic")
        .build();

    return aeron.addSubscription(sub_uri, stream_id);
  }

  /**
   * Create a publication with a control port (for dynamic MDC) at the given
   * address and port, using the given stream ID.
   *
   * @param aeron     The Aeron instance
   * @param address   The address
   * @param port      The port
   * @param stream_id The stream ID
   *
   * @return A new publication
   */

  public static ConcurrentPublication createPublicationDynamicMDC(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int stream_id)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String pub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .controlEndpoint(
          new StringBuilder(32)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .controlMode("dynamic")
        .build();

    return aeron.addPublication(pub_uri, stream_id);
  }

  /**
   * Create a subscription at the given address and port, using the given
   * stream ID and image handlers.
   *
   * @param aeron                The Aeron instance
   * @param address              The address
   * @param port                 The port
   * @param stream_id            The stream ID
   * @param on_image_available   Called when an image becomes available
   * @param on_image_unavailable Called when an image becomes unavailable
   *
   * @return A new publication
   */

  public static Subscription createSubscriptionWithHandlers(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int stream_id,
    final AvailableImageHandler on_image_available,
    final UnavailableImageHandler on_image_unavailable)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");
    Objects.requireNonNull(on_image_available, "on_image_available");
    Objects.requireNonNull(on_image_unavailable, "on_image_unavailable");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String sub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(
          new StringBuilder(32)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .build();

    return aeron.addSubscription(
      sub_uri,
      stream_id,
      on_image_available,
      on_image_unavailable);
  }

  /**
   * Create a publication with a control port (for dynamic MDC) at the given
   * address and port, using the given stream ID and session ID.
   *
   * @param aeron     The Aeron instance
   * @param address   The address
   * @param port      The port
   * @param stream_id The stream ID
   * @param session   The session ID
   *
   * @return A new publication
   */

  public static ConcurrentPublication createPublicationDynamicMDCWithSession(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int stream_id,
    final int session)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String pub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .controlEndpoint(
          new StringBuilder(32)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .controlMode("dynamic")
        .sessionId(Integer.valueOf(session))
        .build();

    return aeron.addPublication(pub_uri, stream_id);
  }

  /**
   * Create a subscription at the given address and port, using the given
   * stream ID, session ID, and image handlers.
   *
   * @param aeron                The Aeron instance
   * @param address              The address
   * @param port                 The port
   * @param stream_id            The stream ID
   * @param on_image_available   Called when an image becomes available
   * @param on_image_unavailable Called when an image becomes unavailable
   * @param session              The session ID
   *
   * @return A new publication
   */

  public static Subscription createSubscriptionWithHandlersAndSession(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int stream_id,
    final AvailableImageHandler on_image_available,
    final UnavailableImageHandler on_image_unavailable,
    final int session)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");
    Objects.requireNonNull(on_image_available, "on_image_available");
    Objects.requireNonNull(on_image_unavailable, "on_image_unavailable");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String sub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(
          new StringBuilder(32)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .sessionId(Integer.valueOf(session))
        .build();

    return aeron.addSubscription(
      sub_uri,
      stream_id,
      on_image_available,
      on_image_unavailable);
  }

  /**
   * Create a subscription with a control port (for dynamic MDC) at the given
   * address and port, using the given stream ID, and session ID.
   *
   * @param aeron     The Aeron instance
   * @param address   The address
   * @param port      The port
   * @param stream_id The stream ID
   * @param session   The session ID
   *
   * @return A new publication
   */

  public static Subscription createSubscriptionDynamicMDCWithSession(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int session,
    final int stream_id)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String sub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .controlEndpoint(
          new StringBuilder(64)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .controlMode("dynamic")
        .sessionId(Integer.valueOf(session))
        .build();

    return aeron.addSubscription(sub_uri, stream_id);
  }

  /**
   * Create a publication at the given address and port, using the given
   * stream ID and session ID.
   *
   * @param aeron     The Aeron instance
   * @param address   The address
   * @param port      The port
   * @param stream_id The stream ID
   * @param session   The session ID
   *
   * @return A new publication
   */

  public static ConcurrentPublication createPublicationWithSession(
    final Aeron aeron,
    final InetAddress address,
    final int port,
    final int session,
    final int stream_id)
  {
    Objects.requireNonNull(aeron, "aeron");
    Objects.requireNonNull(address, "address");

    final String addr_string =
      address.toString().replaceFirst("^/", "");

    final String pub_uri =
      new ChannelUriStringBuilder()
        .reliable(TRUE)
        .media("udp")
        .endpoint(
          new StringBuilder(64)
            .append(addr_string)
            .append(":")
            .append(Integer.toUnsignedString(port))
            .toString())
        .sessionId(Integer.valueOf(session))
        .build();

    return aeron.addPublication(pub_uri, stream_id);
  }
}

In order to address the issues noted about message sending, we define a sendMessage() method that retries sending for up to ~500ms before raising an exception. This ensures that messages cannot just “go missing”:

EchoMessages

public final class EchoMessages
{
  private static final Logger LOG = LoggerFactory.getLogger(EchoMessages.class);

  private EchoMessages()
  {

  }

  /**
   * Send the given message to the given publication. If the publication fails
   * to accept the message, the method will retry {@code 5} times, waiting
   * {@code 100} milliseconds each time, before throwing an exception.
   *
   * @param pub    The publication
   * @param buffer A buffer that will hold the message for sending
   * @param text   The message
   *
   * @return The new publication stream position
   *
   * @throws IOException If the message cannot be sent
   */

  public static long sendMessage(
    final Publication pub,
    final UnsafeBuffer buffer,
    final String text)
    throws IOException
  {
    Objects.requireNonNull(pub, "publication");
    Objects.requireNonNull(buffer, "buffer");
    Objects.requireNonNull(text, "text");

    LOG.trace("[{}] send: {}", Integer.toString(pub.sessionId()), text);

    final byte[] value = text.getBytes(UTF_8);
    buffer.putBytes(0, value);

    long result = 0L;
    for (int index = 0; index < 5; ++index) {
      result = pub.offer(buffer, 0, value.length);
      if (result < 0L) {
        try {
          Thread.sleep(100L);
        } catch (final InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        continue;
      }
      return result;
    }

    throw new IOException(
      "Could not send message: Error code: " + errorCodeName(result));
  }

  private static String errorCodeName(final long result)
  {
    if (result == Publication.NOT_CONNECTED) {
      return "Not connected";
    }
    if (result == Publication.ADMIN_ACTION) {
      return "Administrative action";
    }
    if (result == Publication.BACK_PRESSURED) {
      return "Back pressured";
    }
    if (result == Publication.CLOSED) {
      return "Publication is closed";
    }
    if (result == Publication.MAX_POSITION_EXCEEDED) {
      return "Maximum term position exceeded";
    }
    throw new IllegalStateException();
  }

  /**
   * Extract a UTF-8 encoded string from the given buffer.
   *
   * @param buffer The buffer
   * @param offset The offset from the start of the buffer
   * @param length The number of bytes to extract
   *
   * @return A string
   */

  public static String parseMessageUTF8(
    final DirectBuffer buffer,
    final int offset,
    final int length)
  {
    Objects.requireNonNull(buffer, "buffer");
    final byte[] data = new byte[length];
    buffer.getBytes(offset, data);
    return new String(data, UTF_8);
  }
}

The server’s run() method should not be surprising:

EchoServer.run()

  public void run()
  {
    try (final Publication publication = this.setupAllClientsPublication()) {
      try (final Subscription subscription = this.setupAllClientsSubscription()) {

        final FragmentHandler handler =
          new FragmentAssembler(
            (buffer, offset, length, header) ->
              this.onInitialClientMessage(
                publication,
                buffer,
                offset,
                length,
                header));

        while (true) {
          this.executor.execute(() -> {
            subscription.poll(handler, 100);
            this.clients.poll();
          });

          try {
            Thread.sleep(100L);
          } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
      }
    }
  }

EchoServer.setupAllClientsPublication()

  private Publication setupAllClientsPublication()
  {
    return EchoChannels.createPublicationDynamicMDC(
      this.aeron,
      this.configuration.localAddress(),
      this.configuration.localInitialControlPort(),
      ECHO_STREAM_ID);
  }

EchoServer.setupAllClientsSubscription()

  private Subscription setupAllClientsSubscription()
  {
    return EchoChannels.createSubscriptionWithHandlers(
      this.aeron,
      this.configuration.localAddress(),
      this.configuration.localInitialPort(),
      ECHO_STREAM_ID,
      this::onInitialClientConnected,
      this::onInitialClientDisconnected);
  }

4.6 Echo 2.0 Client

The client implementation is only slightly more complex than the original echo client.

The create() method for the client has similar modifications to those made in the server implementation (such as the introduction of a reserved range of session IDs):

EchoClient.create()

  public static EchoClient create(
    final EchoClientConfiguration configuration)
    throws EchoClientException
  {
    Objects.requireNonNull(configuration, "configuration");

    final String directory =
      configuration.baseDirectory()
        .toAbsolutePath()
        .toString();

    final MediaDriver.Context media_context =
      new MediaDriver.Context()
        .dirDeleteOnStart(true)
        .publicationReservedSessionIdLow(EchoSessions.RESERVED_SESSION_ID_LOW)
        .publicationReservedSessionIdHigh(EchoSessions.RESERVED_SESSION_ID_HIGH)
        .aeronDirectoryName(directory);

    final Aeron.Context aeron_context =
      new Aeron.Context().aeronDirectoryName(directory);

    MediaDriver media_driver = null;

    try {
      media_driver = MediaDriver.launch(media_context);

      Aeron aeron = null;
      try {
        aeron = Aeron.connect(aeron_context);
      } catch (final Exception e) {
        closeIfNotNull(aeron);
        throw e;
      }

      return new EchoClient(media_driver, aeron, configuration);
    } catch (final Exception e) {
      try {
        closeIfNotNull(media_driver);
      } catch (final Exception c_ex) {
        e.addSuppressed(c_ex);
      }
      throw new EchoClientCreationException(e);
    }
  }

The run() method is somewhat more involved:

EchoClient.run()

  public void run()
    throws EchoClientException
  {
    /*
     * Generate a one-time pad.
     */

    this.duologue_key = this.random.nextInt();

    final UnsafeBuffer buffer =
      new UnsafeBuffer(BufferUtil.allocateDirectAligned(1024, 16));

    final String session_name;
    try (final Subscription subscription = this.setupAllClientsSubscription()) {
      try (final Publication publication = this.setupAllClientsPublication()) {

        /*
         * Send a one-time pad to the server.
         */

        EchoMessages.sendMessage(
          publication,
          buffer,
          "HELLO " + Integer.toUnsignedString(this.duologue_key, 16).toUpperCase());

        session_name = Integer.toString(publication.sessionId());
        this.waitForConnectResponse(subscription, session_name);
      } catch (final IOException e) {
        throw new EchoClientIOException(e);
      }
    }

    /*
     * Connect to the publication and subscription that the server has sent
     * back to this client.
     */

    try (final Subscription subscription = this.setupConnectSubscription()) {
      try (final Publication publication = this.setupConnectPublication()) {
        this.runEchoLoop(buffer, session_name, subscription, publication);
      } catch (final IOException e) {
        throw new EchoClientIOException(e);
      }
    }
  }

The code should, by now, be fairly self explanatory. We configure a publication and subscription to the server’s all clients channel, and send a HELLO message using a one-time pad with which we expect the server to encrypt a response. At each step, we wait for publications and subscriptions to become connected, or we time out and fail loudly with an error if they don’t (addressing the issues noted about disconnections).

Assuming that we receive all the data we expect from the server, we enter into the now familar runEchoLoop method:

EchoClient.runEchoLoop()

  private void runEchoLoop(
    final UnsafeBuffer buffer,
    final String session_name,
    final Subscription subscription,
    final Publication publication)
    throws IOException
  {
    final FragmentHandler handler =
      new FragmentAssembler(
        (data, offset, length, header) ->
          onEchoResponse(session_name, data, offset, length));

    while (true) {

      /*
       * Send ECHO messages to the server and wait for responses.
       */

      EchoMessages.sendMessage(
        publication,
        buffer,
        "ECHO " + Long.toUnsignedString(this.random.nextLong(), 16));

      for (int index = 0; index < 100; ++index) {
        subscription.poll(handler, 1000);

        try {
          Thread.sleep(10L);
        } catch (final InterruptedException e) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }

The use of EchoMessages.sendMessage() will cause the loop to break with an exception if the server disconnects the client.

4.7 Example Usage

We start up a server instance on apricot, binding the server to ports 9000 and 9001 on address 5.6.7.8. The server will start assigning client duologues at UDP port 9010, and is limited to a maximum of 4 clients:

apricot$ java -cp aeron-guide.jar com.io7m.aeron_guide.take2.EchoServer /tmp/aeron-server 5.6.7.8 9000 9001 9010 4

We start up a client instance on maize, telling it to connect to apricot and immediately disconnect it (for brevity) after it has sent and received one ECHO message:

maize$ java -cp aeron-guide.jar com.io7m.aeron_guide.take2.EchoClient /tmp/aeron-client 5.6.7.8 9000 9001
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: initial subscription connected
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: initial publication connected
TRACE [main] com.io7m.aeron_guide.take2.EchoMessages: [-555798973] send: HELLO 611786D9
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: waiting for response
TRACE [main] com.io7m.aeron_guide.take2.EchoClient: [-555798973] response: -555798973 CONNECT 9014 9011 129F3C18
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: [-555798973] connect 9014 9011 (encrypted 312425496)
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: CONNECT subscription connected
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: CONNECT publication connected
TRACE [main] com.io7m.aeron_guide.take2.EchoMessages: [1938340545] send: ECHO 12cbcf2e9c84b2f0
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: [-555798973] response: ECHO 12cbcf2e9c84b2f0
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: [-555798973] ECHO 12cbcf2e9c84b2f0

We can see that despite that fact that maize is behind the NAT router tomato, it can still connect to and receive responses from apricot. The address of maize (10.10.0.5) from the perspective of apricot appears to be that of tomato (10.10.0.1):

(apricot)

DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] initial client connected (10.10.0.1:55297)
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] received: HELLO 611786D9
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServerDuologue: creating new duologue at /5.6.7.8 (9014,9011) session 1938340545 for /10.10.0.1
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] created new duologue
TRACE [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoMessages: [-2147483648] send: -555798973 CONNECT 9014 9011 129F3C18
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] initial client disconnected (10.10.0.1:55297)
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServerDuologue: [1938340545] client with correct IP connected
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServerDuologue: [1938340545] received: ECHO 12cbcf2e9c84b2f0
TRACE [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoMessages: [1938340545] send: ECHO 12cbcf2e9c84b2f0
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServerDuologue: [1938340545] last client (/10.10.0.1) disconnected
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] duologue expired
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] duologue closed
DEBUG [com.io7m.aeron_guide.take2.server[16]] com.io7m.aeron_guide.take2.EchoServer: [-555798973] deleted duologue

The packet filter logs on tomato clearly show the datagrams passing through:

rule 1/(match) pass in on em1: 10.10.0.5.43645 > 5.6.7.8.9001: udp 36 (DF)
rule 1/(match) pass out on em0: 10.10.0.1.50088 > 5.6.7.8.9001: udp 36 (DF)
rule 1/(match) pass in on em1: 10.10.0.5.54239 > 5.6.7.8.9000: udp 40 (DF)
rule 1/(match) pass out on em0: 10.10.0.1.55297 > 5.6.7.8.9000: udp 40 (DF)
rule 1/(match) pass in on em1: 10.10.0.5.49564 > 5.6.7.8.9011: udp 36 (DF)
rule 1/(match) pass out on em0: 10.10.0.1.60760 > 5.6.7.8.9011: udp 36 (DF)
rule 1/(match) pass in on em1: 10.10.0.5.36996 > 5.6.7.8.9014: udp 40 (DF)
rule 1/(match) pass out on em0: 10.10.0.1.60643 > 5.6.7.8.9014: udp 40 (DF)

Starting up three client instances on maize and then trying to start up a fourth shows that the server will not allow any more connections from maize’s IP address:

maize$ java -cp aeron-guide.jar com.io7m.aeron_guide.take2.EchoClient /tmp/aeron-client4 5.6.7.8 9000 9001
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: initial subscription connected
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: initial publication connected
TRACE [main] com.io7m.aeron_guide.take2.EchoMessages: [-2147483648] send: HELLO 16418C02
DEBUG [main] com.io7m.aeron_guide.take2.EchoClient: waiting for response
TRACE [main] com.io7m.aeron_guide.take2.EchoClient: [-2147483648] response: -2147483648 ERROR too many connections for IP address
ERROR [main] com.io7m.aeron_guide.take2.EchoClient: [-2147483648] server returned an error: too many connections for IP address
Exception in thread "main" com.io7m.aeron_guide.take2.EchoClientRejectedException: Server rejected this client
    at com.io7m.aeron_guide.take2.EchoClient.waitForConnectResponse(EchoClient.java:353)
    at com.io7m.aeron_guide.take2.EchoClient.run(EchoClient.java:201)
    at com.io7m.aeron_guide.take2.EchoClient.main(EchoClient.java:165)

5 Further Enhancements

5.1 IPv6

Currently, the implementation as described will work correctly over IPv6. The main issue is that the IP-level restrictions that have been added (such as “no more than 4 connections per IP address”) are ineffective in IPv6. The IP restrictions are predicated on the notion that (as is true of IPv4 today) IP addresses are in short supply and so a casual malicious client will not have control of a large collection of IP addresses that could be used to circumvent access restrictions.

In IPv6, every client has access to 18446744073709551616 addresses at a minimum. In order to implement IP access restrictions, the rules would need to be expanded in order to exclude entire subnets.

5.2 Endpoint-level Filtering

The current implementation parses the strings returned by Image.sourceIdentity() and acts based on the parsed addresses. This is somewhat unpleasant and is arguably working at the wrong level of abstraction for an Aeron program. For example, if we wanted to take the client and server program and run them over a transport that was not UDP/IP, they would need to be rewritten.

Aeron is extensible in that it allows programmers to provide their own endpoints. An endpoint can be considered as an abstraction over a resource such as a UDP socket. Aeron comes with a number of custom endpoint implementations that allow for functionality such as simulating packet loss, congestion control, and more. The somewhat crude IP-level restrictions implemented by the server might better be implemented with a custom endpoint implementation.

See the driver extension directory.

5.3 Per-packet Encryption

If we wanted to protect against malicious clients that have the capability of reading and/or modifying the packets sent by another client (by ARP spoofing or some other method), we would need to add some form of authentication and encryption to the protocol.

Currently, the application would need to arrange for the client and server to agree on a session key and would then need to encrypt each message given to Publication.offer() (and accordingly decrypt each message retrieved by Subscription.poll()). This would entail a fair amount of overhead both in the large number of crypto operations required and the inevitable per-message padding required for many algorithm modes to operate.

Hope is on the horizon, however. As of the time of writing (2018-04-10), support for protocol-level encryption is a planned feature in Aeron and should remove the need for application-level handling of crypto.


  1. Aeron is designed to service hundreds of clients as opposed to thousands. See System Design for information.↩︎

  2. The Subscription interface also contains many different variations on poll that provide for different semantics depending on the requirements of the application.↩︎

  3. The method also makes no attempt to check if the contents of the string would overflow the buffer. This could have spectacular consequences given the nature of Unsafe buffers.↩︎

  4. We use a fake:// URI scheme simply because the java.net.URI class requires a scheme of some description if we want it to parse the input into host and port segments.↩︎

  5. A pattern recommended by and for the Disruptor.↩︎

  6. The exception to this rule is when two clients (or possibly the server and the client) are in the same LAN. In this case, the attacker can use ARP spoofing to both read and effectively modify any and all packets. We do not attempt to try to preserve security in this case; doing so requires strong cryptography and would be overkill for this example.↩︎

  7. Aeron is designed to service hundreds of clients as opposed to thousands. See System Design for information.↩︎

  8. Strictly speaking, the server does not “reject” the client based on the session ID. It’s just that the application specifies that it is not interested in packets with anything other than a a given session ID and therefore never sees packets that don’t belong to that session.↩︎

  9. It is critical for the security of a one-time pad that this value is generated afresh on each connection attempt and never reused for any other communication. The security of a one-time pad can be broken instantly by obtaining two different messages that were encrypted with the same value.↩︎

  10. While both appearing to do essentially the same job, the EchoServerPortAllocator and EchoServerSessionAllocator classes use different algorithms due to the differences in the expected range of values to be allocated. The EchoServerPortAllocator class expects to pick numbers from a small range and therefore keeps the entire range in memory: It uses O(n) storage for a range of n values, but has guaranteed O(1) time complexity for both allocation and deallocation. The EchoServerSessionAllocator expects to pick numbers from range of at least 2 ^ 31 and so does not keep a list of all of the unused numbers in memory. It uses only O(m) storage where m is the number of allocated values, but has a worst-case O(n) time complexity for allocation when the number of allocated values approaches n.↩︎