D10093 - Micro Stream, Originator/Coordinator/Terminator

This chapter we’ll introduce a complex example of stream mode, based on this example you can build basic mesh service data flow in zero system.

1. Services

Demo Projects and environment

Http Port Ipc Port Ipc Service Name Project Role
6100 up-athena Api Gateway
6301 6311 ipc-epimetheus up-epimetheus Originator
6401 6411 ipc-coeus up-coeus Coordinator A
6402 6412 ipc-crius up-crius Coordinator B
6501 6511 ipc-hecate up-hecate Terminator

2. Source Code

2.1. HuttApi ( service: up-epimetheus )

package up.god.micro.mesh;

import io.vertx.up.annotations.Address;
import io.vertx.up.annotations.EndPoint;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;

@EndPoint
@Path("/api")
public interface HuttApi {

    @Path("ipc/mesh/hutt/{name}")
    @GET
    @Address("ZERO://IPC/NODE/HUTT")
    String sayHutt(@PathParam("name") String name);
}

2.2. HuttWorker ( service: up-epimetheus )

package up.god.micro.mesh;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.up.unity.Ux;
import io.vertx.up.annotations.Address;
import io.vertx.up.annotations.Ipc;
import io.vertx.up.annotations.Queue;
import io.vertx.up.commune.Envelop;

@Queue
public class HuttWorker {

    @Address("ZERO://IPC/NODE/HUTT")
    @Ipc(to = "RPC://IPC/NODE/HUTT", name = "ipc-coeus")
    public Future<JsonObject> sayHutt(final Envelop envelop) {
        final String name = Ux.getString(envelop);
        return Future.succeededFuture(new JsonObject()
                .put("name", name)
                .put("originator", "ipc-epimetheus"));
    }
}

2.3. HuttInsider ( service: up-coeus )

package up.god.ipc.mesh;

import io.vertx.core.json.JsonObject;
import io.vertx.up.annotations.Ipc;
import io.vertx.up.commune.Envelop;

public class HuttInsider {
    @Ipc(value = "RPC://IPC/NODE/HUTT",
            to = "RPC://IPC/NODE/HUTT1", name = "ipc-crius")
    public String next(final Envelop envelop) {
        final JsonObject data = envelop.data();
        data.put("coordinator1", "ipc-coeus");
        return data.encode();
    }
}

2.4. HuttInsider ( service: up-crius )

package up.god.ipc.mesh;

import io.vertx.core.json.JsonObject;
import io.vertx.up.annotations.Ipc;
import io.vertx.up.commune.Envelop;

public class HuttInsider {
    @Ipc(value = "RPC://IPC/NODE/HUTT1",
            to = "RPC://IPC/NODE/HUTTS", name = "ipc-hecate")
    public JsonObject next(final Envelop envelop) {
        final String content = envelop.data();
        return new JsonObject(content)
                .put("coordinator2", "ipc-cronus");
    }

}

2.5. HuttInsider ( service: up-hecate )

package up.god.ipc.mesh;

import io.vertx.core.json.JsonObject;
import io.vertx.up.annotations.Ipc;
import io.vertx.up.commune.Envelop;

public class HuttInsider {

    @Ipc(value = "RPC://IPC/NODE/HUTTS")
    public JsonObject sayHutt(final Envelop envelop) {
        final JsonObject data = envelop.data();
        return data.put("terminator", "ipc-hecate")
                .put("type", "async");
    }
}

Here please be careful about different role configuration for stream, you can refer D10083 - Micro, Rpc Mode to check different role configuration tutorials.

3. Testing

Here you must start five services and mount to Api Gateway and then you can test current demo as following:

URL : http://localhost:6100/api/ipc/mesh/hutt/huan1

Method : GET

Response :

{
    "data": {
        "name": "huan1",
        "originator": "ipc-epimetheus",
        "coordinator1": "ipc-coeus",
        "coordinator2": "ipc-cronus",
        "terminator": "ipc-hecate",
        "type": "async"
    }
}

4. Summary

Based on above response data you can see the data flow went through four nodes, the point that you should know is that in this mode the data flow could not be revert, it means that the direction is Single, you can pass the data to next service node only.