是否可以显示进度条时,上传图像通过逆转2?

我目前正在使用 Retrofit 2,我想在我的服务器上传一些照片。 我知道,旧版本使用 TypedFile类上传。如果我们想使用它的进度条,我们应该覆盖 TypedFile类中的 writeTo方法。

在使用 retrofit 2库时是否可以显示进度?

72455 次浏览

As far as I can see in this post, no updates regarding the image upload progress response has been made and you still have to override the writeTo method as shown in this SO answer by making a ProgressListener interface and using a sub-class of TypedFile to override the writeTo method.

So, there isn't any built-in way to show progress when using retrofit 2 library.

First of all, you should use Retrofit 2 version equal to or above 2.0 beta2. Second, create new class extends RequestBody:

public class ProgressRequestBody extends RequestBody {
private File mFile;
private String mPath;
private UploadCallbacks mListener;
private String content_type;


private static final int DEFAULT_BUFFER_SIZE = 2048;


public interface UploadCallbacks {
void onProgressUpdate(int percentage);
void onError();
void onFinish();
}

Take note, I added content type so it can accommodate other types aside image

public ProgressRequestBody(final File file, String content_type,  final  UploadCallbacks listener) {
this.content_type = content_type;
mFile = file;
mListener = listener;
}






@Override
public MediaType contentType() {
return MediaType.parse(content_type+"/*");
}


@Override
public long contentLength() throws IOException {
return mFile.length();
}


@Override
public void writeTo(BufferedSink sink) throws IOException {
long fileLength = mFile.length();
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
FileInputStream in = new FileInputStream(mFile);
long uploaded = 0;


try {
int read;
Handler handler = new Handler(Looper.getMainLooper());
while ((read = in.read(buffer)) != -1) {


// update progress on UI thread
handler.post(new ProgressUpdater(uploaded, fileLength));


uploaded += read;
sink.write(buffer, 0, read);
}
} finally {
in.close();
}
}


private class ProgressUpdater implements Runnable {
private long mUploaded;
private long mTotal;
public ProgressUpdater(long uploaded, long total) {
mUploaded = uploaded;
mTotal = total;
}


@Override
public void run() {
mListener.onProgressUpdate((int)(100 * mUploaded / mTotal));
}
}
}

Third, create the interface

@Multipart
@POST("/upload")
Call<JsonObject> uploadImage(@Part MultipartBody.Part file);

/* JsonObject above can be replace with you own model, just want to make this notable. */

Now you can get progress of your upload. In your activity (or fragment):

class MyActivity extends AppCompatActivity implements ProgressRequestBody.UploadCallbacks {


ProgressBar progressBar;


@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);


progressBar = findViewById(R.id.progressBar);


ProgressRequestBody fileBody = new ProgressRequestBody(file, this);
MultipartBody.Part filePart =


MultipartBody.Part.createFormData("image", file.getName(), fileBody);


Call<JsonObject> request = RetrofitClient.uploadImage(filepart);


request.enqueue(new Callback<JsonObject>() {
@Override
public void onResponse(Call<JsonObject> call,   Response<JsonObject> response) {
if(response.isSuccessful()){
/* Here we can equally assume the file has been downloaded successfully because for some reasons the onFinish method might not be called, I have tested it myself and it really not consistent, but the onProgressUpdate is efficient and we can use that to update our progress on the UIThread, and we can then set our progress to 100% right here because the file already downloaded finish. */
}
}


@Override
public void onFailure(Call<JsonObject> call, Throwable t) {
/* we can also stop our progress update here, although I have not check if the onError is being called when the file could not be downloaded, so I will just use this as a backup plan just in case the onError did not get called. So I can stop the progress right here. */
}
});


}


@Override
public void onProgressUpdate(int percentage) {
// set current progress
progressBar.setProgress(percentage);
}


@Override
public void onError() {
// do something on error
}


@Override
public void onFinish() {
// do something on upload finished,
// for example, start next uploading at a queue
progressBar.setProgress(100);
}


}

Here's how to handle upload file progress with a simple POST rather than Multipart. For multipart check out @Yariy's solution. Additionally, this solution uses Content URI's instead of direct file references.

RestClient

@Headers({
"Accept: application/json",
"Content-Type: application/octet-stream"
})
@POST("api/v1/upload")
Call<FileDTO> uploadFile(@Body RequestBody file);

ProgressRequestBody

public class ProgressRequestBody extends RequestBody {
private static final String LOG_TAG = ProgressRequestBody.class.getSimpleName();


public interface ProgressCallback {
public void onProgress(long progress, long total);
}


public static class UploadInfo {
//Content uri for the file
public Uri contentUri;


// File size in bytes
public long contentLength;
}


private WeakReference<Context> mContextRef;
private UploadInfo mUploadInfo;
private ProgressCallback mListener;


private static final int UPLOAD_PROGRESS_BUFFER_SIZE = 8192;


public ProgressRequestBody(Context context, UploadInfo uploadInfo, ProgressCallback listener) {
mContextRef = new WeakReference<>(context);
mUploadInfo =  uploadInfo;
mListener = listener;
}


@Override
public MediaType contentType() {
// NOTE: We are posting the upload as binary data so we don't need the true mimeType
return MediaType.parse("application/octet-stream");
}


@Override
public void writeTo(BufferedSink sink) throws IOException {
long fileLength = mUploadInfo.contentLength;
byte[] buffer = new byte[UPLOAD_PROGRESS_BUFFER_SIZE];
InputStream in = in();
long uploaded = 0;


try {
int read;
while ((read = in.read(buffer)) != -1) {
mListener.onProgress(uploaded, fileLength);


uploaded += read;


sink.write(buffer, 0, read);
}
} finally {
in.close();
}
}


/**
* WARNING: You must override this function and return the file size or you will get errors
*/
@Override
public long contentLength() throws IOException {
return mUploadInfo.contentLength;
}


private InputStream in() throws IOException {
InputStream stream = null;
try {
stream = getContentResolver().openInputStream(mUploadInfo.contentUri);
} catch (Exception ex) {
Log.e(LOG_TAG, "Error getting input stream for upload", ex);
}


return stream;
}


private ContentResolver getContentResolver() {
if (mContextRef.get() != null) {
return mContextRef.get().getContentResolver();
}
return null;
}
}

To initiate the upload:

// Create a ProgressRequestBody for the file
ProgressRequestBody requestBody = new ProgressRequestBody(
getContext(),
new UploadInfo(myUri, fileSize),
new ProgressRequestBody.ProgressCallback() {
public void onProgress(long progress, long total) {
//Update your progress UI here
//You'll probably want to use a handler to run on UI thread
}
}
);


// Upload
mRestClient.uploadFile(requestBody);

Warning, if you forget to override the contentLength() function you may receive a few obscure errors:

retrofit2.adapter.rxjava.HttpException: HTTP 503 client read error

Or

Write error: ssl=0xb7e83110: I/O error during system call, Broken pipe

Or

javax.net.ssl.SSLException: Read error: ssl=0x9524b800: I/O error during system call, Connection reset by peer

These are a result of RequestBody.writeTo() being called multiple times as the default contentLength() is -1.

Anyways this took a long time to figure out, hope it helps.

Useful links: https://github.com/square/retrofit/issues/1217

I update progressbar onProgressUpdate. This code can get better performance.

@Override
public void writeTo(BufferedSink sink) throws IOException {
long fileLength = mFile.length();
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
FileInputStream in = new FileInputStream(mFile);
long uploaded = 0;


try {
int read;
Handler handler = new Handler(Looper.getMainLooper());
int num = 0;
while ((read = in.read(buffer)) != -1) {


int progress = (int) (100 * uploaded / fileLength);
if( progress > num + 1 ){
// update progress on UI thread
handler.post(new ProgressUpdater(uploaded, fileLength));
num = progress;
}


uploaded += read;
sink.write(buffer, 0, read);
}
} finally {
in.close();
}
}

Modified Yuriy Kolbasinskiy's to use rxjava and use kotlin. Added a workaround for using HttpLoggingInterceptor at the same time

class ProgressRequestBody : RequestBody {


val mFile: File
val ignoreFirstNumberOfWriteToCalls : Int




constructor(mFile: File) : super(){
this.mFile = mFile
ignoreFirstNumberOfWriteToCalls = 0
}


constructor(mFile: File, ignoreFirstNumberOfWriteToCalls : Int) : super(){
this.mFile = mFile
this.ignoreFirstNumberOfWriteToCalls = ignoreFirstNumberOfWriteToCalls
}




var numWriteToCalls = 0


protected val getProgressSubject: PublishSubject<Float> = PublishSubject.create<Float>()


fun getProgressSubject(): Observable<Float> {
return getProgressSubject
}




override fun contentType(): MediaType {
return MediaType.parse("video/mp4")
}


@Throws(IOException::class)
override fun contentLength(): Long {
return mFile.length()
}


@Throws(IOException::class)
override fun writeTo(sink: BufferedSink) {
numWriteToCalls++


val fileLength = mFile.length()
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
val `in` = FileInputStream(mFile)
var uploaded: Long = 0


try {
var read: Int
var lastProgressPercentUpdate = 0.0f
read = `in`.read(buffer)
while (read != -1) {


uploaded += read.toLong()
sink.write(buffer, 0, read)
read = `in`.read(buffer)


// when using HttpLoggingInterceptor it calls writeTo and passes data into a local buffer just for logging purposes.
// the second call to write to is the progress we actually want to track
if (numWriteToCalls > ignoreFirstNumberOfWriteToCalls ) {
val progress = (uploaded.toFloat() / fileLength.toFloat()) * 100f
//prevent publishing too many updates, which slows upload, by checking if the upload has progressed by at least 1 percent
if (progress - lastProgressPercentUpdate > 1 || progress == 100f) {
// publish progress
getProgressSubject.onNext(progress)
lastProgressPercentUpdate = progress
}
}
}
} finally {
`in`.close()
}
}




companion object {


private val DEFAULT_BUFFER_SIZE = 2048
}
}

An example video upload interface

public interface Api {


@Multipart
@POST("/upload")
Observable<ResponseBody> uploadVideo(@Body MultipartBody requestBody);
}

An example function to post a video:

fun postVideo(){
val api : Api = Retrofit.Builder()
.client(OkHttpClient.Builder()
//.addInterceptor(HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY))
.build())
.baseUrl("BASE_URL")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build()
.create(Api::class.java)


val videoPart = ProgressRequestBody(File(VIDEO_URI))
//val videoPart = ProgressRequestBody(File(VIDEO_URI), 1) //HttpLoggingInterceptor workaround
val requestBody = MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("example[name]", place.providerId)
.addFormDataPart("example[video]","video.mp4", videoPart)
.build()


videoPart.getProgressSubject()
.subscribeOn(Schedulers.io())
.subscribe { percentage ->
Log.i("PROGRESS", "${percentage}%")
}


var postSub : Disposable?= null
postSub = api.postVideo(requestBody)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ r ->
},{e->
e.printStackTrace()
postSub?.dispose();


}, {
Toast.makeText(this,"Upload SUCCESS!!",Toast.LENGTH_LONG).show()
postSub?.dispose();
})
}

I tried to use above code but i found UI was getting stuck so i tried this code this is working for me or may try using this code

@luca992 Thank you for your answer. I have implemented this in JAVA and now it is working fine.

public class ProgressRequestBodyObservable extends RequestBody {


File file;
int ignoreFirstNumberOfWriteToCalls;
int numWriteToCalls;`enter code here`


public ProgressRequestBodyObservable(File file) {
this.file = file;


ignoreFirstNumberOfWriteToCalls =0;
}


public ProgressRequestBodyObservable(File file, int ignoreFirstNumberOfWriteToCalls) {
this.file = file;
this.ignoreFirstNumberOfWriteToCalls = ignoreFirstNumberOfWriteToCalls;
}




PublishSubject<Float> floatPublishSubject = PublishSubject.create();


public Observable<Float> getProgressSubject(){
return floatPublishSubject;
}


@Override
public MediaType contentType() {
return MediaType.parse("image/*");
}


@Override
public long contentLength() throws IOException {
return file.length();
}






@Override
public void writeTo(BufferedSink sink) throws IOException {
numWriteToCalls++;




float fileLength = file.length();
byte[] buffer = new byte[2048];
FileInputStream in = new  FileInputStream(file);
float uploaded = 0;


try {
int read;
read = in.read(buffer);
float lastProgressPercentUpdate = 0;
while (read != -1) {


uploaded += read;
sink.write(buffer, 0, read);
read = in.read(buffer);


// when using HttpLoggingInterceptor it calls writeTo and passes data into a local buffer just for logging purposes.
// the second call to write to is the progress we actually want to track
if (numWriteToCalls > ignoreFirstNumberOfWriteToCalls ) {
float progress = (uploaded / fileLength) * 100;
//prevent publishing too many updates, which slows upload, by checking if the upload has progressed by at least 1 percent
if (progress - lastProgressPercentUpdate > 1 || progress == 100f) {
// publish progress
floatPublishSubject.onNext(progress);
lastProgressPercentUpdate = progress;
}
}
}
} finally {
in.close();
}


}
}

Remove the Http Logging interceptor from httpbuilder. Else it will call writeTo() twice. Or change logging Level from BODY.

To avoid twice running issue. We can set flag as zero initially and set flag as one after the first calling the progress dialog.

 @Override
public void writeTo(BufferedSink sink) throws IOException {


Source source = null;
try {
source = Okio.source(mFile);
total = 0;
long read;


Handler handler = new Handler(Looper.getMainLooper());


while ((read = source.read(sink.buffer(), DEFAULT_BUFFER_SIZE)) != -1) {


total += read;
sink.flush();


// flag for avoiding first progress bar .
if (flag != 0) {
handler.post(() -> mListener.onProgressUpdate((int) (100 * total / mFile.length())));


}
}


flag = 1;


} finally {
Util.closeQuietly(source);
}
}

You can use FileUploader that usage Retrofit Library for connecting to API. To upload the file the code skeleton is as follow:

FileUploader fileUploader = new FileUploader();
fileUploader.uploadFiles("/", "file", filesToUpload, new FileUploader.FileUploaderCallback() {
@Override
public void onError() {
// Hide progressbar
}


@Override
public void onFinish(String[] responses) {
// Hide progressbar


for(int i=0; i< responses.length; i++){
String str = responses[i];
Log.e("RESPONSE "+i, responses[i]);
}
}


@Override
public void onProgressUpdate(int currentpercent, int totalpercent, int filenumber) {
// Update Progressbar
Log.e("Progress Status", currentpercent+" "+totalpercent+" "+filenumber);
}
});

Full steps are available at Medium:

Retrofit multiple file upload with progress in Android

this answer use for MultipartBody and uploading multiple files. my server-side codes are mvc development. first, you need ApiService class like this:

public interface ApiService {


@POST("Home/UploadVideos")
Call<ResponseBody> postMeme(@Body RequestBody files);
}

and you need Apiclient like this:

public class ApiClient {
public static final String API_BASE_URL = "http://192.168.43.243/Web/";


private static OkHttpClient.Builder httpClient = new OkHttpClient.Builder();


private static Retrofit.Builder builder = new Retrofit.Builder().baseUrl(API_BASE_URL).addConverterFactory(GsonConverterFactory.create());


public static ApiService createService(Class<ApiService> serviceClass)
{
Retrofit retrofit = builder.client(httpClient.build()).build();
return retrofit.create(serviceClass);
}
}

after that you need ReqestBody class like this:

public class CountingFileRequestBody extends RequestBody {
private static final String TAG = "CountingFileRequestBody";


private final ProgressListener listener;
private final String key;
private final MultipartBody multipartBody;
protected CountingSink mCountingSink;


public CountingFileRequestBody(MultipartBody multipartBody,
String key,
ProgressListener listener) {
this.multipartBody = multipartBody;
this.listener = listener;
this.key = key;
}


@Override
public long contentLength() throws IOException {
return multipartBody.contentLength();
}


@Override
public MediaType contentType() {
return multipartBody.contentType();
}


@Override
public void writeTo(BufferedSink sink) throws IOException {
mCountingSink = new CountingSink(sink);
BufferedSink bufferedSink = Okio.buffer(mCountingSink);
multipartBody.writeTo(bufferedSink);
bufferedSink.flush();
}


public interface ProgressListener {
void transferred(String key, int num);
}


protected final class CountingSink extends ForwardingSink {
private long bytesWritten = 0;


public CountingSink(Sink delegate) {
super(delegate);
}


@Override
public void write(Buffer source, long byteCount) throws IOException {
bytesWritten += byteCount;
listener.transferred(key, (int) (100F * bytesWritten / contentLength()));
super.write(source, byteCount);
delegate().flush(); // I have added this line to manually flush the sink
}
}


}

and finally, you need this code:

ApiService service = ApiClient.createService(ApiService.class);


MultipartBody.Builder builder = new MultipartBody.Builder();
builder.setType(MultipartBody.FORM);
builder.addFormDataPart("files",file1.getName(), RequestBody.create(MediaType.parse("video/*"), file1));
builder.addFormDataPart("files",file3.getName(), RequestBody.create(MediaType.parse("video/*"), file3));


MultipartBody requestBody = builder.build();


CountingFileRequestBody requestBody1 = new CountingFileRequestBody(requestBody, "files", new CountingFileRequestBody.ProgressListener() {
@Override
public void transferred(String key, int num) {
Log.d("FinishAdapter","Perecentae is :"+num);
//update progressbar here
dialog.updateProgress(num);
if (num == 100){
dialog.dismiss();
}


}
});


Call<ResponseBody> call = service.postMeme(requestBody1);
call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {
// Toast.makeText(getBaseContext(),"All fine",Toast.LENGTH_SHORT).show();
Log.d("FinishAdapter","every thing is ok............!");
Log.d("FinishAdapter",response.toString());
}


@Override
public void onFailure(Call<ResponseBody> call, Throwable t) {
//Toast.makeText(getBaseContext(),t.getMessage(),Toast.LENGTH_SHORT).show();
Log.d("FinishAdapter","every thing is failed............!");
}
});

hope it helps.

Extension for creating a Part. The callback will be called during invoking service

fun File.toPart(type: String = "image/*", callback: (progress: Int)->Unit) = MultipartBody.Part.createFormData(name, name, object : RequestBody() {
val contentType = MediaType.parse(type)
val length = this@toPart.length()
var uploaded = 0L
override fun contentType(): MediaType? {
return contentType
}


override fun contentLength(): Long = length


@Throws(IOException::class)
override fun writeTo(sink: BufferedSink) {
var source: Source? = null
try {
source = Okio.source(this@toPart)


do {
val read = source.read(sink.buffer(), 2048)
if(read == -1L) return // exit at EOF
sink.flush()
uploaded += read
callback((uploaded.toDouble()/length.toDouble()*100).toInt())
} while(true)
//sink.writeAll(source!!)
} finally {
Util.closeQuietly(source)
}
}
})

I realize this question was answered years ago, but I thought I'd update it for Kotlin:

Create a class that extends RequestBody. Be sure to populate the ContentType enum class to use whichever content types you need to support.

class RequestBodyWithProgress(
private val file: File,
private val contentType: ContentType,
private val progressCallback:((progress: Float)->Unit)?
) : RequestBody() {


override fun contentType(): MediaType? = MediaType.parse(contentType.description)


override fun contentLength(): Long = file.length()


override fun writeTo(sink: BufferedSink) {
val fileLength = contentLength()
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
val inSt = FileInputStream(file)
var uploaded = 0L
inSt.use {
var read: Int = inSt.read(buffer)
val handler = Handler(Looper.getMainLooper())
while (read != -1) {
progressCallback?.let {
uploaded += read
val progress = (uploaded.toDouble() / fileLength.toDouble()).toFloat()
handler.post { it(progress) }


sink.write(buffer, 0, read)
}
read = inSt.read(buffer)
}
}
}


enum class ContentType(val description: String) {
PNG_IMAGE("image/png"),
JPG_IMAGE("image/jpg"),
IMAGE("image/*")
}
}

Upload the file using Retrofit:

fun uploadFile(fileUri: Uri, progressCallback:((progress: Float)->Unit)?) {
val file = File(fileUri.path)
if (!file.exists()) throw FileNotFoundException(fileUri.path)


// create RequestBody instance from file
val requestFile = RequestBodyWithProgress(file, RequestBodyWithProgress.ContentType.PNG_IMAGE, progressCallback)


// MultipartBody.Part is used to send also the actual file name
val body = MultipartBody.Part.createFormData("image_file", file.name, requestFile)


publicApiService().uploadFile(body).enqueue(object : Callback<MyResponseObj> {
override fun onFailure(call: Call<MyResponseObj>, t: Throwable) {


}


override fun onResponse(call: Call<MyResponseObj>, response: Response<MyResponseObj>) {


}
})


}

I appericiate @Yuriy Kolbasinskiy given answer but it gives error for me "expected 3037038 bytes but received 3039232" after I Change some on WriteTo() function. The Answer is in Kotlin which given below :-

override fun writeTo(sink: BufferedSink) {
var uploaded:Long = 0
var source: Source? = null
try {
source = Okio.source(file)
val handler = Handler(Looper.getMainLooper())


do {
val read = source.read(sink.buffer(), 2048)
while (read == -1L) return
uploaded += read


handler.post(ProgressUpdater(uploaded, file.length()))
sink.flush()
} while(true)
} finally {
Util.closeQuietly(source)
}
}

This is the Extension function for Kotlin.

/** Returns a new request body that transmits the content of this. */
fun File.asRequestBodyWithProgress(
contentType: MediaType? = null,
progressCallback: ((progress: Float) -> Unit)?
): RequestBody {
return object : RequestBody() {
override fun contentType() = contentType


override fun contentLength() = length()


override fun writeTo(sink: BufferedSink) {
val fileLength = contentLength()
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
val inSt = FileInputStream(this@asRequestBodyWithProgress)
var uploaded = 0L
inSt.use {
var read: Int = inSt.read(buffer)
val handler = Handler(Looper.getMainLooper())
while (read != -1) {
progressCallback?.let {
uploaded += read
val progress = (uploaded.toDouble() / fileLength.toDouble()).toFloat()
handler.post { it(progress) }


sink.write(buffer, 0, read)
}
read = inSt.read(buffer)
}
}
}
}
}

here is the usage of above function with Flow

private val key = "file"
private val multiPart = "multipart/form-data".toMediaType()


@WorkerThread
fun uploadFile(
path: String,
onStart: () -> Unit,
onComplete: () -> Unit,
onProgress: (progress: Float) -> Unit,
onError: (String?) -> Unit
) = flow {
val file = File(path)
val requestFile = file.asRequestBodyWithProgress(multiPart, onProgress)
val requestBody = MultipartBody.Part.createFormData(key, file.name, requestFile)
//val requestBody = MultipartBody.Builder().addFormDataPart(key, file.name, requestFile).build()


val response = uploadClient.uploadFile(requestBody)
response.suspendOnSuccess {
data.whatIfNotNull {
emit(it)
}
}.onError {
/** maps the [ApiResponse.Failure.Error] to the [ErrorResponse] using the mapper. */
map(ErrorResponseMapper) { onError("[Code: $code]: $message") }
}.onException { onError(message) }
}.onStart { onStart() }.onCompletion { onComplete() }.flowOn(Dispatchers.IO)

UPDATE -> It is very hard to get real path of the file since API 30. So we can used InputStream as below mentioned.

@WorkerThread
fun uploadFile(
path: String,
onStart: () -> Unit,
onComplete: (String?) -> Unit,
onProgress: (progress: Float) -> Unit,
onError: (String?) -> Unit
) = flow {
openStream(path).whatIfNotNull { inputStream ->
val requestFile = inputStream.asRequestBodyWithProgress(MultipartBody.FORM, onProgress)
val requestBody = MultipartBody.Part.createFormData(key, "file", requestFile)
//val requestBody = MultipartBody.Builder().addFormDataPart(key, file.name, requestFile).build()


uploadClient.uploadFile(requestBody).suspendOnSuccess {
data.whatIfNotNull {
link = it.link
emit(it)
}
}.onError {
/** maps the [ApiResponse.Failure.Error] to the [ErrorResponse] using the mapper. */
map(ErrorResponseMapper) { onError("[Code: $code]: $message") }
}.onException { onError(message) }
}
}.onStart { onStart() }.onCompletion { onComplete(link) }.flowOn(Dispatchers.IO)


private fun openStream(path: String): InputStream? {
return context.contentResolver.openInputStream(Uri.parse(path))
}




/** Returns a new request body that transmits the content of this. */
fun InputStream.asRequestBodyWithProgress(
contentType: MediaType? = null,
progressCallback: ((progress: Float) -> Unit)?
): RequestBody {
return object : RequestBody() {
override fun contentType() = contentType


override fun contentLength() = try {
available().toLong()
} catch (e: IOException){
Timber.e(e)
0
}


override fun writeTo(sink: BufferedSink) {
val fileLength = contentLength()
val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
val inputStream = this@asRequestBodyWithProgress
var uploaded = 0L
inputStream.use {
var read: Int = inputStream.read(buffer)
val handler = Handler(Looper.getMainLooper())
while (read != -1) {
progressCallback?.let {
uploaded += read
val progress = (uploaded.toDouble() / fileLength.toDouble()).toFloat()
handler.post { it(progress) }


sink.write(buffer, 0, read)
}
read = inputStream.read(buffer)
}
}
}
}
}

Here is an extension function heavily inspired from this thread and it is working really well.

fun File.toRequestBody(progressCallback: ((progress: Int) -> Unit)?): RequestBody {
return object : RequestBody() {


private var currentProgress = 0
private var uploaded = 0L


override fun contentType(): MediaType? {
val fileType = name.substringAfterLast('.', "")
return fileType.toMediaTypeOrNull()
}


@Throws(IOException::class)
override fun writeTo(sink: BufferedSink) {
source().use { source ->
do {
val read = source.read(sink.buffer, 2048)
if (read == -1L) return // exit at EOF
sink.flush()
uploaded += read


/**
* The value of newProgress is going to be in between 0.0 - 2.0
*/
var newProgress = ((uploaded.toDouble() / length().toDouble()))


/**
* To map it between 0.0 - 100.0
* Need to multiply it with 50
* (OutputMaxRange/InputMaxRange)
* 100 / 2 = 50
*/
newProgress = (50 * newProgress)


if (newProgress.toInt() != currentProgress) {
progressCallback?.invoke(newProgress.toInt())
}
currentProgress = newProgress.toInt()
} while (true)
}
}
}
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
long fileLength = mFile.length();
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
FileInputStream in = new FileInputStream(mFile);
long uploaded = 0;
try {
int read;
Handler handler = new Handler(Looper.getMainLooper());
int num = 0;
while ((read = in.read(buffer)) != -1) {


int progress = (int) (100 * uploaded / fileLength);
if (progress > num + 1) {
mListener.onProgressUpdate((int) (100 * uploaded / fileLength));
num = progress;
}
uploaded += read;
if (uploaded == fileLength) {
mListener.onFinish();
}
sink.write(buffer, 0, read);
}
Log.e("Progress", "erwer");
} finally {
in.close();
}
}

This code guarantees onFinish call.