Saturday, July 23, 2016

When things may get out of control: circuit breakers in practice. Apache Zest and Akka.

In the previous post we have started the discussion about circuit breakers and why this pattern gained so much importance these days. We have learned about Netflix Hystrix, the most advanced circuit breaker implementation for JVM platform, and its typical integration scenarios. In this post we are going to continue exploring the other options available, starting from Apache Zest library.

Surprisingly, Apache Zest being certainly a gem, is not well-known and widely used. It is a framework for domain centric application development which aims to explore composite-oriented programming paradigm. Its roots go back to 2007, where it was born under another name, Qi4j (and became Apache Zest in 2015). It would require a complete book just to go through Apache Zest features and concepts, but what we are interested in is the fact that Apache Zest has simple circuit breaker implementation.

Let us use the same example to consume https://freegeoip.net/ REST(ful) web API and wrap the communication with this external service using CircuitBreaker from Apache Zest:

public class GeoIpService {
    private static final String URL = "http://freegeoip.net/";
    private final ObjectMapper mapper = new ObjectMapper();
    private final CircuitBreaker breaker = new CircuitBreaker(5, 1000 * 120);
 
    public GeoIpDetails getDetails(final String host) {
        try {
            if (breaker.isOn()) {
                GeoIpDetails details = mapper.readValue(get(host), GeoIpDetails.class);
                breaker.success();
                return details;
            } else {
                // Fallback to empty response
                return new GeoIpDetails();
            }
        } catch (final IOException ex) {
            breaker.throwable(ex);
            throw new RuntimeException("Communication with '" + URL + "' failed", ex);
        } catch (final URISyntaxException ex) {
            // Should never happen, but just trip circuit breaker immediately
            breaker.trip(); 
            throw new RuntimeException("Invalid service endpoint: " + URL, ex);
        }
    }

    private String get(final String host) throws IOException, URISyntaxException {
        return Request
            .Get(new URIBuilder(URL).setPath("/json/" + host).build())
            .connectTimeout(1000)
            .socketTimeout(3000)
            .execute()
            .returnContent()
            .asString();
    }
}

Essentially, this is as basic CircuitBreaker implementation as it could possible get. We configured it to have a threshold of 5 failures (which in our case means failing requests) and sleeping window of 2 minutes (120 * 1000 milliseconds). It becomes the responsibility of the application developer to report the successes and failures using success() and throwable(...) methods respectively, with the option to open the circuit breaker immediately using trip() method call. Please take a note that CircuitBreaker relies on Java synchronization mechanisms and is thread-safe.

Interestingly, the CircuitBreaker from Apache Zest uses a little bit different conventions: instead of operating on closed / open states, it treats them as on / off ones. Those are more familiar to most of us. And to finish up, basic JMX instrumentation is also available out of the box.

It requires a couple of lines to be added into the GeoIpService initialization (constructor f.e.) to register and expose managed beans:

public GeoIpService() throws Exception {
    final ObjectName name = new ObjectName("circuit-breakers", 
        "zest-circuit-breaker", "freegeoip.net");

    ManagementFactory
        .getPlatformMBeanServer()
        .registerMBean(new CircuitBreakerJMX(breaker, name), name);
}

Please do not hesitate to glance through official Apache Zest Circuit Breaker documentation, there are quite a few use cases you may found useful for your projects. The complete example is available on Github.

In case you are developing on JVM using Scala programming language, you are certainly a lucky one as there is native circuit breaker implementation available as part of Akka toolkit. For example, let us redesign our Geo IP service consumer as a typical Akka actor which is going to make HTTP call over to https://freegeoip.net/:

case class GeoIp(host: String)
case class GeoIpDetails(ip: String = "", countryCode: String = "", 
  countryName: String = "", latitude: Double = 0, longitude: Double = 0)

class GeoIpActor extends Actor with ActorLogging {
  import spray.json._
  import spray.json.DefaultJsonProtocol._
  
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  
  import context.dispatcher
  import context.system
  
  val breaker = new CircuitBreaker(
    context.system.scheduler,
    maxFailures = 5,
    callTimeout = 15 seconds,
    resetTimeout = 2 minutes)
  
  def receive = {
    case GeoIp(host) => breaker
      .withCircuitBreaker(Http()
        .singleRequest(HttpRequest(uri = s"http://freegeoip.net/json/$host"))
        .flatMap {
          case HttpResponse(OK, _, entity, _) => Unmarshal(entity).to[GeoIpDetails]
          case _ => Future.successful(GeoIpDetails())
        }
      ) pipeTo sender()
  }
}

At this moment, the pattern undoubtedly is looking familiar to all of us. The only new option which Akka's CircuitBreaker brings on the table is overall call timeout: the execution will be considered as failed when it is not completed within this time period (certainly very handy addition to the circuit breaker capabilities). The withCircuitBreaker function takes care of managing the circuit breaker state around the wrapped block of the code.

The interactions with the GeoIpActor are no different from any other Akka actor:

implicit val system = ActorSystem("circuit-breakers")
implicit val timeout: Timeout = 5 seconds

val geoIp = system.actorOf(Props[GeoIpActor], "geo-ip-actor")
val result = (geoIp ? GeoIp("8.8.8.8")).mapTo[GeoIpDetails]
  
result andThen { 
  case Success(details) => log.info("GEO IP: {}", details)
  case Failure(ex) => log.error("Communication error", ex)
} 

By looking a bit deeper into CircuitBreaker documentation we could get some insights about its internals. There are actually three states the CircuitBreaker could be: open, closed and half-open. The presence of half-open state serves the purpose to perform just a single attempt to try out if the invocation is back to normal operations or not.

The code snippet looks perfect but one thing to keep in mind is how the Actor Model and absence of shared mutable state affects the circuit breaker state synchronization. To facilitate that, Akka's CircuitBreaker has a rich set of callback notifications (like onOpen, onHalfOpen, onClose) so the state changes could be broadcasted between actors. The complete project sources are available on Github.

For the curious readers, just a few closing notes about adoption of the circuit breaker implementations. Netflix Hystrix is the number one choice at the moment, particularly (but not only) because of superior support from Spring community. Obviously, Akka CircuitBreaker is a natural choice for Scala developers who build their applications on top of excellent Akka toolkit. Concerning Apache Zest Circuit Breaker, it could be used as-is (if you want to fully control the behavior) or be easily integrated as an useful extension into existing general-purpose clients. For example, Apache CXF allows to configure JAX-RS / JAX-WS clients with Failover feature, including the circuit breaker-based implementation: CircuitBreakerFailoverFeature.

Hope this series of posts extended a little bit your awareness about circuit breaker pattern and the state of its available implementations on the JVM platform. The repository with complete project samples is available on Github.

Stay resilient!

No comments: