Loading...

Reactive Microservices Increasing Capacity and Performance

Reactive Microservices Increasing Capacity and Performance

Learn about reactive microservices in this article by Denis Kolodin, the core developer of Yew, a modern Rust framework who has been developing high-loaded network applications for more than 12 years. He writes full-stack apps in Rust at his job and is known for his unique problem-solving capabilities when it comes to Rust.

If you adhere to microservices architecture for your application, you'll get the benefits of loose coupling, which means that every microservice is standalone enough to be developed and maintained by separate teams. This is a kind of asynchronous approach to business tasks, but it's not the only benefit; there are others. You can increase your capacity and performance by only scaling the microservices that take a huge load. To achieve this, your microservice has to be reactive, self-sustaining, and interacting with other microservices via message passing.

If you want to connect microservices to each other directly, you can use RPCs to allow the functions of a service to be called remotely by another service. In this article, you’ll learn to use remote procedure calls (RPCs) in Rust with the jsonrpc-http-server. You can find the sources of the examples in the GitHub repository 

There are a lot of RPC frameworks with different formats and speed potential. Let's look at some popular protocols.

JSON-RPC

The JSON-RPC protocol uses messages serialized to JSON format. It uses a special format for requests and responses and can use different transports, such as HTTP, Unix sockets, or even stdio.

gRPC

The gRPC protocol was created by Google and uses the Protocol Buffer serialization format for messages. Also, the protocol lies in the benefits of HTTP/2 transport and allows you to achieve excellent performance.

Thrift

Apache Thrift is a binary communication protocol developed by Facebook. Despite the fact that the protocol is binary, there are a lot of supported languages, such as C++, Go, and Java. Supported transports are file, memory, and socket.


Other RPC Frameworks

There are other RPC protocols, such as Cap'n Proto, XML-RPC, and even vintage SOAP. Some have implementations for Rust, but it is recommended choosing between JSON-RPC, gRPC, and Thrift because they are the most commonly used for microservices.

RPC and REST


You may ask if it is possible to implement reactive microservices with a REST API or a traditional web API. Of course—yes! You can do it one of two ways:

There are gateways that translate REST requests to JSON-RPC or other protocols. For example, gRPC has one ready to use. You can even write your own gateway—it's not so hard for simple or specific cases.

You can use a Web API to send messages from one server to another. A microservice doesn't have to have a single API path, but you can add a special handler for messages in JSON or other formats. For transport, you can use not only HTTP but also the WebSocket protocol.

Reactive Manifesto

If you look at the reactive architecture as a standardized approach, you won't find a guide or rules for how to turn your microservice reactive, but there is The Reactive Manifesto It contains a list of principles you can use to be inspired by ideas for the improvement of your application.

Now, you can create an example of a reactive microservice for the JSON-RPC protocol.

Understanding JSON-RPC

There are some crates that provide functionality to support the JSON-RPC protocol. Mostly, crates support only the server or the client side, not both. Some crates don't support asynchronous computations either.

How JSON-RPC Works


The JSON-RPC protocol uses JSON messages in the following format for a request:

{"jsonrpc": "2.0", "method": "substring", "params": [2, 6, \"string\"], "id": 1}

The preceding JSON message calls the substring remote method of a server that can return a result like this:

{"jsonrpc": "2.0", "result": "ring", "id": 1}

It's worth noting that a client determines the identifier of the request and has to track those values. Servers are ID-agnostic and they use a connection to track requests.

There are two versions of the protocol—1.0 and 2.0. They are similar, but in the second version, there is a separation of the client and server. Also, it is transport independent, because the first version uses connection events to determine behavior. There are improvements for errors and parameters as well. You should use version 2.0 for new projects.

To support JSON-RPC, your server has to respond to these JSON requests. The protocol is really simple to implement, but use the jsonrpc-http-server crate, which uses HTTP transport and provides types to bootstrap a server.

Creating A Microservice

Now, you’ll learn to create an example of a microservice that supports the JSON-RPC protocol and has two methods. The microservice will support working as a part of a ring of microservices. You’ll send a message to one microservice, which will send a message to the next microservice in the ring, and that microservice will send the message further. If it is implemented incorrectly, your microservice will be blocked, because they can't process requests in parallel like reactive services have to do.

Dependencies

First, import the necessary dependencies:

failure = "0.1"

JSON-RPC = { git = "https://github.com/apoelstra/rust-JSON-RPC" }

jsonrpc-http-server = { git = "https://github.com/paritytech/JSON-RPC" }

log = "0.4"

env_logger = "0.6"

serde = "1.0"

serde_derive = "1.0"

Most likely, you may be familiar with most crates, except jsonrpc and json-rpc-server. The first is a JSON-RPC client that's based on the hyper crate. The second also uses the hyper crate and provides server functionality of JSON-RPC. Import the necessary types:

use failure:: Error;

use JSON-RPC::client::Client;

use JSON-RPC::error::Error as ClientError;

use JSON-RPC_http_server::ServerBuilder;

use JSON-RPC_http_server::JSON-RPC_core::{IoHandler, Error as ServerError, Value};

use log::{debug, error, trace};

use serde:: Deserialize;

use std::env;

use std::fmt;

use std::net::SocketAddr;

use std::sync::Mutex;

use std::sync::mpsc::{channel, Sender};

use std::thread;

The JSON-RPC crate has the Client type that you’ll use to call the remote methods of other services. You’ve also imported Error from that crate as ClientError to avoid a name conflict with Error from the failure crate.

For the server side, use ServerBuilder from the jsonrpc-HTTP-server crate. Also, you need Error to be renamed to ServerError from that crate. To implement function handlers, you need to import IoHandler, which can be used to attach functions as RPC methods. Also, you need a Value (actually, this type is re-imported from the serde_json crate), which is used as a result type for RPC methods.

Avoid mistakes in method names; as you’ll use them twice for the server implementation and then in a client, declare names as string constants:

const START_ROLL_CALL: &str = "start_roll_call";

const MARK_ITSELF: &str = "mark_itself";

The first method will start sending messages from one microservice to the next. The second method is used to stop this roll-calling process.

Client

To interact with other microservice instances and to call their remote methods, create a separate struct, because it's more convenient than using the JSON-RPC Client directly. But in any case, you use this type internally in your struct:

struct Remote {

client: Client,

}

Use the Remote struct to make calls to remote services. To create the struct, use the following constructor:

impl Remote {

fn new(addr: SocketAddr) -> Self {

let url = format!("http://{}", addr);

let client = Client::new(url, None, None);

Self {

client

}

}

}

The Client struct expects the String URL as a parameter, but use SocketAddr to create a URL. Also, you’ll need a generic function that will use the Client instance to call remote methods. Add the call_method method to the implementation of the Remote struct:

fn call_method<T>(&self, meth: &str, args: &[Value]) -> Result<T, ClientError>

where

T: for <'de> Deserialize<'de>,

{

let request = self.client.build_request(meth, args);

self.client.send_request(&request).and_then(|res| res.into_result::<T>())

}

The calling of the JSON-RPC method using the JSON-RPC crate is simple. Use the build_request method of the Client instance to create a Request and send it using the send_request method of the same Client. There is a method called do_rpc that does this in a single call. Use a more verbose approach to show that you can predefine requests and use them to speed up the preparation for calling. Also, it's more pleasant to use business-oriented struct methods instead of a raw Client. Isolate an implementation using a wrapper that hides the details of RPC calls. What if you decide to change to another protocol, such as gRPC?

Add special methods to the Remote struct implementation to make calls using the call_method method. First, you need the start_roll_call function:

fn start_roll_call(&self) -> Result<bool, ClientError> {

self.call_method(START_ROLL_CALL, &[])

}

It won't pass any parameters with a call, but it expects the bool type of the result. You’ll use a constant for the remote method's name. Add the mark_itself method to the Remote struct:

fn mark_itself(&self) -> Result<bool, ClientError> {

self.call_method("mark_itself", &[])

}

It doesn't send any parameters either and returns the book value. Now you can add a worker to separate outgoing method calls from incoming calls.

Worker

Since you have two methods, you’ll add a struct to perform remote calls of these methods from a worker thread. Add the Action enumeration to the code:

enum Action {

StartRollCall,

MarkItself,

}

It has two variants: StartRollCall to perform the remote start_roll_call method call and the MarkItself variant to call the remote mark_itself method.

Now you can add a function to spawn a worker in a separate thread. If you perform outgoing calls immediately in incoming method handlers, you can block the execution, because you have a ring of microservices and blocking one microservice will block the whole ring's interaction.

Look at the spawn_worker function:

fn spawn_worker() -> Result<Sender<Action>, Error> {

let (tx, rx) = channel();

let next: SocketAddr = env::var("NEXT")?.parse()?;

thread::spawn(move || {

let remote = Remote::new(next);

let mut in_roll_call = false;

for action in rx.iter() {

match action {

Action::StartRollCall => {

if !in_roll_call {

if remote.start_roll_call().is_ok() {

debug!("ON");

in_roll_call = true;

}

} else {

if remote.mark_itself().is_ok() {

debug!("OFF");

in_roll_call = false;

}

}

}

Action::MarkItself => {

if in_roll_call {

if remote.mark_itself().is_ok() {

debug!("OFF");

in_roll_call = false;

}

} else {

debug!("SKIP");

}

}

}

}

});

Ok(tx)

}

This function creates a channel and spawns a new thread with a routine that processes all received messages from a channel. Create the Remote instance with the address extracted from the NEXT environment variable.

There is a flag that shows that the start_roll_call method has been called. Set it to true when the StartRollCall message is received and the start_roll_call method of the remote server is called. If the flag is already set to true and the routine received the StartRollCall message, the thread will call the mark_itself remote method. In other words, you’ll call the start_roll_call methods of all running service instances. When all services set the flag to true, you’ll call the mark_itself methods of all services.

Start a server and run a ring of services.

Server


The main function activates a logger and spawns a worker. Then, you extract the ADDRESS environment variable to use this address value to bind a socket of a server. Look at the following code:

fn main() -> Result<(), Error> {

env_logger::init();

let tx = spawn_worker()?;

let addr: SocketAddr = env::var("ADDRESS")?.parse()?;

let mut io = IoHandler::default();

let sender = Mutex::new(tx.clone());

io.add_method(START_ROLL_CALL, move |_| {

trace!("START_ROLL_CALL");

let tx = sender

.lock()

.map_err(to_internal)?;

tx.send(Action::StartRollCall)

.map_err(to_internal)

.map(|_| Value::Bool(true))

});

let sender = Mutex::new(tx.clone());

io.add_method(MARK_ITSELF, move |_| {

trace!("MARK_ITSELF");

let tx = sender

.lock()

.map_err(to_internal)?;

tx.send(Action::MarkItself)

.map_err(to_internal)

.map(|_| Value::Bool(true))

});

let server = ServerBuilder::new(io).start_http(&addr)?;

Ok(server.wait())

}

To implement JSON-RPC methods, use the IoHandler struct. It has the add_method method, which expects the name of the method and needs closure with an implementation of this method.

You’ve added two methods, start_roll_call and mark_itself, using constants as names for these methods. The implementation of these methods is simple: only prepare the corresponding Action messages and send them to the worker's thread.

The JSON-RPC method implementation has to return the Result<Value, ServerError> value. To convert any other errors to ServerError, use the following function:

fn to_internal<E: fmt::Display>(err: E) -> ServerError {

error!("Error: {}", err);

ServerError::internal_error()

}

The function only prints the current error message and creates an error with the InternalError code using the internal_error method of the ServerError type.

At the end of the main function, create a new ServerBuilder instance with the created IoHandler and start the HTTP server to listen for JSON-RPC requests with the start_http server.

Now start a ring of services to test it.

Compiling and Running


Compile this example with the cargo build subcommand, and start three instances of the service using the following commands (run every command in a separate terminal window to see the logs):

RUST_LOG=JSON-RPC_ring=trace ADDRESS=127.0.0.1:4444 NEXT=127.0.0.1:5555 target/debug/JSON-RPC-ring

RUST_LOG=JSON-RPC_ring=trace ADDRESS=127.0.0.1:5555 NEXT=127.0.0.1:6666 target/debug/JSON-RPC-ring

RUST_LOG=JSON-RPC_ring=trace ADDRESS=127.0.0.1:6666 NEXT=127.0.0.1:4444 target/debug/JSON-RPC-ring

When the three services are started, prepare and send a JSON-RPC call request with curl from another terminal window:

curl -H "Content-Type: application/json" --data-binary '{"JSON-RPC":"2.0","id":"curl","method":"start_roll_call","params":[]}' http://127.0.0.1:4444

With this command, start the interaction of all services, and they will call each other in a ring. You will see the logs of every service. The first prints something like this:

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] START_ROLL_CALL

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] ON

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] START_ROLL_CALL

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] OFF

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] MARK_ITSELF

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] SKIP

The second will print something like this:

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] START_ROLL_CALL

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] ON

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] MARK_ITSELF

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] OFF

And the third will output the following logs:

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] START_ROLL_CALL

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] ON

[2019-01-14T10:45:29Z TRACE JSON-RPC_ring] MARK_ITSELF

[2019-01-14T10:45:29Z DEBUG JSON-RPC_ring] OFF

All services work as independent participants of the process, react to incoming messages and send messages to other services when there is something to send.

If you found this article interesting, you can explore Hands-On Microservices with Rust, a comprehensive guide in developing and deploying high-performance microservices with Rust. Hands-On Microservices with Rust describes web development using the Rust programming language and will get you up and running with modern web frameworks and crates with examples of RESTful microservices creation.
Microservices 7478121694744688577

Post a Comment

emo-but-icon

Home item

Like Us

Popular Posts