Micro Stream, Consumer ( Worker ) to Terminator
From this chapter we’ll focus on rpc stream mode, this mode we’ll meet different role ( method ) such
as Originator, Coordinator, Terminator etc. This chapter tutorial is different
from D10088 - Micro Stream, Sender ( Agent ) to Terminator,
- In D10088, The
Originatoris Sender ( Agent ), it means that the request came fromeventloopof vert.x; - In current tutorial, The
Originatoris Consumer ( Worker ), it means that the request came fromworkerpoolof vert.x;
1. Services

Demo Projects and environment
| Http Port | Ipc Port | Ipc Service Name | Project | Role |
|---|---|---|---|---|
| 6100 | – | – | up-athena | Api Gateway |
| 6301 | 6311 | ipc-epimetheus | up-epimetheus | Originator |
| 6501 | 6511 | ipc-hecate | up-hecate | Terminator |
2. Source Code
2.1. ConsumeApi ( service: up-epimetheus )
package up.god.micro.worker;
import io.vertx.up.annotations.Address;
import io.vertx.up.annotations.EndPoint;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@EndPoint
@Path("/api")
public interface ConsumeApi {
@Path("ipc/stream/consumer1/{name}")
@GET
@Address("ZERO://IPC/NODE/WORKER1")
String sayWorker1(@PathParam("name") String name);
@Path("ipc/stream/consumer2/{name}")
@GET
@Address("ZERO://IPC/NODE/WORKER2")
String sayWorker2(@PathParam("name") String name);
@Path("ipc/stream/consumer3/{name}")
@GET
@Address("ZERO://IPC/NODE/WORKER3")
String sayWorker3(@PathParam("name") String name);
}
2.2. ConsumeWorker ( service: up-epimetheus )
Future mode used in current tutorials
package up.god.micro.worker;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.up.unity.Ux;
import io.vertx.up.annotations.Address;
import io.vertx.up.annotations.Ipc;
import io.vertx.up.annotations.Queue;
import io.vertx.up.commune.Envelop;
@Queue
public class ConsumeWorker {
@Address("ZERO://IPC/NODE/WORKER1")
@Ipc(to = "RPC://IPC/NODE/WORKER1", name = "ipc-hecate")
public Future<JsonObject> worker1(final Envelop envelop) {
final String name = Ux.getString(envelop);
return Future.succeededFuture(new JsonObject()
.put("name", name)
.put("originator", "ipc-epimetheus"));
}
@Address("ZERO://IPC/NODE/WORKER2")
@Ipc(to = "RPC://IPC/NODE/WORKER2", name = "ipc-hecate")
public Future<JsonObject> worker2(final Envelop envelop) {
final String name = Ux.getString(envelop);
return Future.succeededFuture(new JsonObject()
.put("name", name)
.put("originator", "ipc-epimetheus"));
}
@Address("ZERO://IPC/NODE/WORKER3")
@Ipc(to = "RPC://IPC/NODE/WORKER3", name = "ipc-hecate")
public Future<JsonObject> worker3(final Envelop envelop) {
final String name = Ux.getString(envelop);
return Future.succeededFuture(new JsonObject()
.put("name", name)
.put("originator", "ipc-epimetheus"));
}
}
Here are some rules when you use current tutorial knowledge to write worker:
- In Sender ( Agent ), the annotations
@Ipcand@Addresscould not be used at the same time, but here it’s allowed. - The method must contains return value, you should not use
voidfor the consumer method. - In current tutorials, the return type of
Consumermethods are allFuture<T>instead of others.
2.3. WorkerInsider ( service: up-hecate )
package up.god.ipc;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.up.annotations.Ipc;
import io.vertx.up.commune.Envelop;
public class WorkerInsider {
@Ipc("RPC://IPC/NODE/WORKER1")
public JsonObject sayWorker1(final Envelop envelop) {
final JsonObject data = envelop.data();
return data.put("terminator", "ipc-hecate")
.put("worker", "worker1");
}
@Ipc("RPC://IPC/NODE/WORKER2")
public Envelop sayWorker2(final Envelop envelop) {
final JsonObject data = envelop.data();
return Envelop.success(data
.put("terminator", "ipc-hecate")
.put("worker", "worker2"));
}
@Ipc("RPC://IPC/NODE/WORKER3")
public Future<JsonObject> sayWorker3(final Envelop envelop) {
final JsonObject data = envelop.data();
return Future.succeededFuture(data
.put("terminator", "ipc-hecate")
.put("worker", "worker3"));
}
}
3. Testing
Then when you start three services, you can do the testing:
3.1. Case 1
URL : http://localhost:6100/api/ipc/stream/consumer1/huan1
Method : GET
Response :
{
"data": {
"name": "huan1",
"originator": "ipc-epimetheus",
"terminator": "ipc-hecate",
"worker": "worker1"
}
}
3.2. Case 2
URL : http://localhost:6100/api/ipc/stream/consumer2/huan2
Method : GET
Response :
{
"data": {
"name": "huan2",
"originator": "ipc-epimetheus",
"terminator": "ipc-hecate",
"worker": "worker2"
}
}
3.3. Case 3
URL : http://localhost:6100/api/ipc/stream/consumer3/huan3
Method : GET
Response :
{
"data": {
"name": "huan3",
"originator": "ipc-epimetheus",
"terminator": "ipc-hecate",
"worker": "worker3"
}
}
4. Summary
Here we provide three demos to show how to write the method in Rpc Server, except the input data type Envelop, you
can choose different return type of the methods as you want. The method signature is as following:
public Future<JsonObject> worker1(final Envelop envelop)