← back to the blog


Retrofit Call Adapter for Android RxJava Threading

Posted on 27 Nov 2016 in java, rxjava, retrofit, threading by Greg E.

Retrofit RxJava Call Pattern

If you are using retrofit with RxJava observables, you are very familiar with this code:

api.getUser("grennis")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { ... }

This sets up the API call to execute on the IO thread, and run your handler to process the results on the main thread using the RxAndroid scheduler.

Well, 100% of the time you want the call to happen on the IO thread, and you almost always want the result on the main thread. The one exception here I can think of is if you want to chain calls together on a background thread, but in this case you don’t really need the observable pattern at all - just use the retrofit convention that returns Call<T>.

This is code that is repeated everywhere. It’s boilerplate, does no good and can only lead to mistakes.

Update: also see this post for a much shorter solution using Kotlin.

Removing the subscribeOn

Recognizing that you always want the call to happen on the IO thread, the RxJava2CallAdapterFactory offers an overload to set this default when you setup retrofit. Instead of calling

Retrofit.Builder()
    .baseUrl(...)
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())

You can call

Retrofit.Builder()
    .baseUrl(...)
    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))

Pass in the IO scheduler to the adapter, and it will always be used for subscribeOn. The code in the adapter to make this happen is found here.

Removing the observeOn

The RxJava2CallAdapter does not offer a similiar method to always observe on a specific scheduler, but it can be done by creating a new adapter that wraps the existing one and modifies the observable to do this:

@Override
public <R> Observable<?> adapt(Call<R> call) {
    Observable observable = (Observable) wrapped.adapt(call);

    // Always handle result on main thread
    return observable.observeOn(AndroidSchedulers.mainThread());
}

This adapter can also call createWithScheduler on the wrapped adapter to handle both cases.

RxThreadingCallAdapterFactory

The full class that eliminates need for both boilerplate calls is here:

public class RxThreadingCallAdapterFactory extends CallAdapter.Factory {
    private final RxJava2CallAdapterFactory original;

    private RxThreadingCallAdapterFactory() {
        // Always call on background thread
        original = RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io());
    }

    public static CallAdapter.Factory create() {
        return new RxThreadingCallAdapterFactory();
    }

    @Override
    public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
        return new RxCallAdapterWrapper(original.get(returnType, annotations, retrofit));
    }

    private static class RxCallAdapterWrapper implements CallAdapter<Observable<?>> {
        private final CallAdapter<?> wrapped;

        public RxCallAdapterWrapper(CallAdapter<?> wrapped) {
            this.wrapped = wrapped;
        }

        @Override
        public Type responseType() {
            return wrapped.responseType();
        }

        @Override
        public <R> Observable<?> adapt(Call<R> call) {
            Observable observable = (Observable) wrapped.adapt(call);

            // Always handle result on main thread
            return observable.observeOn(AndroidSchedulers.mainThread());
        }
    }
}

And to use it, pass to retrofit:

Retrofit.Builder()
    .baseUrl(...)
    .addCallAdapterFactory(RxThreadingCallAdapterFactory.create())

And now your API call can simply look like this:

api.getUser("grennis").subscribe { ... }


comments powered by Disqus