package org.adamalang.caravan;

import ch.qos.logback.core.joran.util.beans.BeanUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.rtsp.RtspHeaders;
import io.netty.util.internal.PlatformDependent;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.adamalang.ErrorCodes;
import org.adamalang.caravan.contracts.ByteArrayStream;
import org.adamalang.caravan.contracts.Cloud;
import org.adamalang.caravan.data.DurableListStore;
import org.adamalang.caravan.events.AssetByteAccountant;
import org.adamalang.caravan.events.ByteArrayHelper;
import org.adamalang.caravan.events.EventCodec;
import org.adamalang.caravan.events.Events;
import org.adamalang.caravan.events.LocalCache;
import org.adamalang.caravan.events.RestoreLoader;
import org.adamalang.caravan.events.RestoreWalker;
import org.adamalang.common.Callback;
import org.adamalang.common.ErrorCodeException;
import org.adamalang.common.NamedRunnable;
import org.adamalang.common.ProtectedUUID;
import org.adamalang.common.SimpleExecutor;
import org.adamalang.common.Stream;
import org.adamalang.runtime.contracts.DeleteTask;
import org.adamalang.runtime.data.ArchivingDataService;
import org.adamalang.runtime.data.BackupResult;
import org.adamalang.runtime.data.ComputeMethod;
import org.adamalang.runtime.data.DocumentRestore;
import org.adamalang.runtime.data.DocumentSnapshot;
import org.adamalang.runtime.data.Key;
import org.adamalang.runtime.data.LocalDocumentChange;
import org.adamalang.runtime.data.RemoteDocumentUpdate;
import org.adamalang.runtime.json.JsonStreamWriter;
import org.jline.terminal.impl.AbstractWindowsTerminal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/adamalang/caravan/CaravanDataService.class */
public class CaravanDataService implements ArchivingDataService {
    private final CaravanMetrics metrics;
    private final Cloud cloud;
    private final DurableListStore store;
    private final SimpleExecutor executor;
    private final Logger LOGGER = LoggerFactory.getLogger((Class<?>) CaravanDataService.class);
    private final HashMap<Key, LocalCache> cache = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.adamalang.caravan.CaravanDataService$2, reason: invalid class name */
    /* loaded from: input_file:org/adamalang/caravan/CaravanDataService$2.class */
    public class AnonymousClass2 implements Callback<File> {
        final /* synthetic */ Callback val$callback;
        final /* synthetic */ Key val$key;

        AnonymousClass2(Callback callback, Key key) {
            this.val$callback = callback;
            this.val$key = key;
        }

        @Override // org.adamalang.common.Callback
        public void success(File file) {
            try {
                ArrayList<byte[]> load = RestoreLoader.load(file);
                CaravanDataService caravanDataService = CaravanDataService.this;
                Key key = this.val$key;
                Callback callback = this.val$callback;
                Key key2 = this.val$key;
                Callback callback2 = this.val$callback;
                caravanDataService.execute("restore", key, false, callback, localCache -> {
                    if (localCache != null) {
                        CaravanDataService.this.mergeRestore(key2, localCache, load, callback2);
                        return;
                    }
                    try {
                        CaravanDataService.this.store.read(key2, new LocalCache() { // from class: org.adamalang.caravan.CaravanDataService.2.1
                            @Override // org.adamalang.caravan.contracts.ByteArrayStream
                            public void finished() {
                                CaravanDataService.this.mergeRestore(key2, this, load, new Callback<Void>() { // from class: org.adamalang.caravan.CaravanDataService.2.1.1
                                    @Override // org.adamalang.common.Callback
                                    public void success(Void r5) {
                                        CaravanDataService.this.addToCacheIfDoesntExistReturnCorrect(key2, this);
                                        callback2.success(null);
                                    }

                                    @Override // org.adamalang.common.Callback
                                    public void failure(ErrorCodeException errorCodeException) {
                                        callback2.failure(errorCodeException);
                                    }
                                });
                            }
                        });
                    } catch (Exception e) {
                        CaravanDataService.this.LOGGER.error("failed-restore", (Throwable) e);
                        callback2.failure(new ErrorCodeException(ErrorCodes.CARAVAN_CANT_RESTORE_CANT_READ, e));
                    }
                });
            } catch (Exception e) {
                this.val$callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_CANT_RESTORE_EXCEPTION, e));
            }
        }

        @Override // org.adamalang.common.Callback
        public void failure(ErrorCodeException errorCodeException) {
            this.val$callback.failure(errorCodeException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.adamalang.caravan.CaravanDataService$6, reason: invalid class name */
    /* loaded from: input_file:org/adamalang/caravan/CaravanDataService$6.class */
    public class AnonymousClass6 extends NamedRunnable {
        final /* synthetic */ Key val$key;
        final /* synthetic */ boolean val$load;
        final /* synthetic */ String val$name;
        final /* synthetic */ Consumer val$action;
        final /* synthetic */ Callback val$callback;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass6(String str, String[] strArr, Key key, boolean z, String str2, Consumer consumer, Callback callback) {
            super(str, strArr);
            this.val$key = key;
            this.val$load = z;
            this.val$name = str2;
            this.val$action = consumer;
            this.val$callback = callback;
        }

        @Override // org.adamalang.common.NamedRunnable
        public void execute() throws Exception {
            LocalCache localCache = CaravanDataService.this.cache.get(this.val$key);
            if (this.val$load && localCache == null) {
                CaravanDataService.this.load(this.val$key, new Callback<LocalCache>() { // from class: org.adamalang.caravan.CaravanDataService.6.1
                    @Override // org.adamalang.common.Callback
                    public void success(final LocalCache localCache2) {
                        CaravanDataService.this.executor.execute(new NamedRunnable(AnonymousClass6.this.val$name, new String[]{"load"}) { // from class: org.adamalang.caravan.CaravanDataService.6.1.1
                            @Override // org.adamalang.common.NamedRunnable
                            public void execute() throws Exception {
                                AnonymousClass6.this.val$action.accept(CaravanDataService.this.addToCacheIfDoesntExistReturnCorrect(AnonymousClass6.this.val$key, localCache2));
                            }
                        });
                    }

                    @Override // org.adamalang.common.Callback
                    public void failure(ErrorCodeException errorCodeException) {
                        CaravanDataService.this.LOGGER.error("caravan-failure-load:" + errorCodeException.code + "/msg=" + errorCodeException.getMessage());
                        AnonymousClass6.this.val$callback.failure(errorCodeException);
                    }
                });
            } else {
                this.val$action.accept(localCache);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.adamalang.caravan.CaravanDataService$9, reason: invalid class name */
    /* loaded from: input_file:org/adamalang/caravan/CaravanDataService$9.class */
    public class AnonymousClass9 extends NamedRunnable {
        final /* synthetic */ Key val$key;
        final /* synthetic */ Stream val$stream;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass9(String str, String[] strArr, Key key, Stream stream) {
            super(str, strArr);
            this.val$key = key;
            this.val$stream = stream;
        }

        @Override // org.adamalang.common.NamedRunnable
        public void execute() throws Exception {
            CaravanDataService.this.store.read(this.val$key, new ByteArrayStream() { // from class: org.adamalang.caravan.CaravanDataService.9.1
                @Override // org.adamalang.caravan.contracts.ByteArrayStream
                public void next(int i, byte[] bArr, final int i2, long j) throws Exception {
                    EventCodec.route(Unpooled.wrappedBuffer(bArr), new EventCodec.HandlerEvent() { // from class: org.adamalang.caravan.CaravanDataService.9.1.1
                        @Override // org.adamalang.caravan.events.EventCodec.HandlerEvent
                        public void handle(Events.Snapshot snapshot) {
                            AnonymousClass9.this.val$stream.next("[" + (i2 == snapshot.seq ? Integer.valueOf(i2) : i2 + "/" + snapshot.seq) + "] SNAPSHOT:" + snapshot.document + " (history=" + snapshot.history + ", assets=" + snapshot.assetBytes + ")");
                        }

                        @Override // org.adamalang.caravan.events.EventCodec.HandlerEvent
                        public void handle(Events.Batch batch) {
                            if (batch.changes.length > 1) {
                                AnonymousClass9.this.val$stream.next("BATCH:" + batch.changes.length);
                            }
                            for (Events.Change change : batch.changes) {
                                handle(change);
                            }
                        }

                        @Override // org.adamalang.caravan.events.EventCodec.HandlerEvent
                        public void handle(Events.Recover recover) {
                            AnonymousClass9.this.val$stream.next("RECOVER:" + recover.seq);
                        }

                        @Override // org.adamalang.caravan.events.EventCodec.HandlerEvent
                        public void handle(Events.Change change) {
                            AnonymousClass9.this.val$stream.next("[" + i2 + ":" + change.seq_begin + "-->" + change.seq_end + "] REQUEST:" + change.request + " ; REDO:" + change.redo + " ; UNDO=" + change.undo + " (active=" + change.active + ")");
                        }
                    });
                }

                @Override // org.adamalang.caravan.contracts.ByteArrayStream
                public void finished() throws Exception {
                    AnonymousClass9.this.val$stream.complete();
                }
            });
        }
    }

    public CaravanDataService(CaravanMetrics caravanMetrics, Cloud cloud, DurableListStore durableListStore, SimpleExecutor simpleExecutor) {
        this.metrics = caravanMetrics;
        this.cloud = cloud;
        this.store = durableListStore;
        this.executor = simpleExecutor;
        scheduleReport(0);
    }

    private void scheduleReport(int i) {
        this.executor.schedule(new NamedRunnable("disk-report", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.1
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                CaravanDataService.this.store.report();
                CaravanDataService.this.scheduleReport((int) (30000.0d + (60000.0d * Math.random())));
            }
        }, i);
    }

    @Override // org.adamalang.runtime.data.ArchivingDataService
    public void cleanUp(Key key, String str) {
        this.cloud.delete(key, str, Callback.DONT_CARE_VOID);
        File file = new File(new File(this.cloud.path(), key.space), str);
        if (file.exists()) {
            file.delete();
        }
    }

    @Override // org.adamalang.runtime.data.ArchivingDataService
    public void restore(Key key, String str, Callback<Void> callback) {
        this.cloud.restore(key, str, new AnonymousClass2(this.metrics.caravan_restore.wrap(callback), key));
    }

    @Override // org.adamalang.runtime.data.ArchivingDataService
    public void backup(Key key, Callback<BackupResult> callback) {
        String str = ProtectedUUID.generate() + "-" + System.currentTimeMillis();
        Callback wrap = this.metrics.caravan_backup.wrap(callback);
        execute("backup", key, true, wrap, localCache -> {
            final int seq = localCache.seq();
            final AtomicLong atomicLong = new AtomicLong(0L);
            final AssetByteAccountant assetByteAccountant = new AssetByteAccountant();
            File file = new File(this.cloud.path(), key.space);
            if (!file.exists()) {
                file.mkdir();
            }
            File file2 = new File(file, str + ".temp");
            File file3 = new File(file, str);
            try {
                final DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(file2));
                this.store.read(key, new ByteArrayStream() { // from class: org.adamalang.caravan.CaravanDataService.3
                    @Override // org.adamalang.caravan.contracts.ByteArrayStream
                    public void next(int i, byte[] bArr, int i2, long j) throws Exception {
                        atomicLong.addAndGet(bArr.length);
                        assetByteAccountant.account(bArr, j);
                        dataOutputStream.writeBoolean(true);
                        dataOutputStream.writeInt(bArr.length);
                        dataOutputStream.write(bArr);
                    }

                    @Override // org.adamalang.caravan.contracts.ByteArrayStream
                    public void finished() throws Exception {
                        dataOutputStream.writeBoolean(false);
                    }
                });
                dataOutputStream.flush();
                dataOutputStream.close();
                Files.move(file2.toPath(), file3.toPath(), StandardCopyOption.ATOMIC_MOVE);
                if (assetByteAccountant.hasThereBeenDataloss()) {
                    this.LOGGER.error("detected data loss during a backup:" + key.space + "/" + key.key + "->" + str);
                    this.metrics.caravan_datalog_loss.up();
                }
                this.cloud.backup(key, file3, new Callback<Void>() { // from class: org.adamalang.caravan.CaravanDataService.4
                    @Override // org.adamalang.common.Callback
                    public void success(Void r11) {
                        wrap.success(new BackupResult(str, seq, atomicLong.get(), assetByteAccountant.getBytes()));
                    }

                    @Override // org.adamalang.common.Callback
                    public void failure(ErrorCodeException errorCodeException) {
                        wrap.failure(errorCodeException);
                    }
                });
            } catch (Exception e) {
                this.LOGGER.error("failed-backup", (Throwable) e);
                wrap.failure(new ErrorCodeException(ErrorCodes.CARAVAN_CANT_BACKUP_EXCEPTION, e));
            }
        });
    }

    private LocalCache addToCacheIfDoesntExistReturnCorrect(Key key, LocalCache localCache) {
        LocalCache putIfAbsent = this.cache.putIfAbsent(key, localCache);
        if (putIfAbsent == null) {
            return localCache;
        }
        this.metrics.caravan_waste.run();
        return putIfAbsent;
    }

    public void diagnostics(final Callback<String> callback) {
        this.executor.execute(new NamedRunnable("diagnostics", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.5
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                JsonStreamWriter jsonStreamWriter = new JsonStreamWriter();
                jsonStreamWriter.beginObject();
                jsonStreamWriter.writeObjectFieldIntro("platform");
                jsonStreamWriter.beginObject();
                jsonStreamWriter.writeObjectFieldIntro("unsafe");
                jsonStreamWriter.writeBoolean(PlatformDependent.hasUnsafe());
                jsonStreamWriter.writeObjectFieldIntro("prefer-direct");
                jsonStreamWriter.writeBoolean(PlatformDependent.directBufferPreferred());
                jsonStreamWriter.writeObjectFieldIntro("no-cleaner");
                jsonStreamWriter.writeBoolean(PlatformDependent.useDirectBufferNoCleaner());
                jsonStreamWriter.writeObjectFieldIntro(AbstractWindowsTerminal.TYPE_WINDOWS);
                jsonStreamWriter.writeBoolean(PlatformDependent.isWindows());
                jsonStreamWriter.writeObjectFieldIntro("osx");
                jsonStreamWriter.writeBoolean(PlatformDependent.isOsx());
                jsonStreamWriter.writeObjectFieldIntro("is-j9");
                jsonStreamWriter.writeBoolean(PlatformDependent.isJ9Jvm());
                ByteBuf buffer = Unpooled.buffer();
                jsonStreamWriter.writeObjectFieldIntro("byte-buf-class");
                jsonStreamWriter.writeString(buffer.getClass().getName());
                buffer.writeIntLE(42);
                buffer.writeLongLE(13421342L);
                jsonStreamWriter.writeObjectFieldIntro("sanity-size");
                jsonStreamWriter.writeInteger(ByteArrayHelper.convert(buffer).length);
                jsonStreamWriter.endObject();
                jsonStreamWriter.writeObjectFieldIntro("cache");
                jsonStreamWriter.beginObject();
                for (Map.Entry<Key, LocalCache> entry : CaravanDataService.this.cache.entrySet()) {
                    jsonStreamWriter.writeObjectFieldIntro(entry.getKey().space + "/" + entry.getKey().key);
                    jsonStreamWriter.beginObject();
                    jsonStreamWriter.writeObjectFieldIntro(RtspHeaders.Values.SEQ);
                    jsonStreamWriter.writeInteger(entry.getValue().seq());
                    jsonStreamWriter.endObject();
                }
                jsonStreamWriter.endObject();
                jsonStreamWriter.writeObjectFieldIntro("working-set");
                jsonStreamWriter.beginObject();
                jsonStreamWriter.writeObjectFieldIntro("index");
                jsonStreamWriter.beginArray();
                Iterator<Long> it = CaravanDataService.this.store.listIndex().iterator();
                while (it.hasNext()) {
                    jsonStreamWriter.writeLong(it.next().longValue());
                }
                jsonStreamWriter.endArray();
                jsonStreamWriter.writeObjectFieldIntro("keys");
                jsonStreamWriter.beginObject();
                for (Map.Entry<Key, Integer> entry2 : CaravanDataService.this.store.map().entrySet()) {
                    jsonStreamWriter.writeObjectFieldIntro(entry2.getKey().space + "/" + entry2.getKey().key);
                    jsonStreamWriter.writeInteger(entry2.getValue().intValue());
                }
                jsonStreamWriter.endObject();
                jsonStreamWriter.endObject();
                jsonStreamWriter.endObject();
                callback.success(jsonStreamWriter.toString());
            }
        });
    }

    private <T> void execute(String str, Key key, boolean z, Callback<T> callback, Consumer<LocalCache> consumer) {
        this.executor.execute(new AnonymousClass6(str, new String[0], key, z, str, consumer, callback));
    }

    private void load(final Key key, final Callback<LocalCache> callback) {
        try {
            this.store.read(key, new LocalCache() { // from class: org.adamalang.caravan.CaravanDataService.7
                @Override // org.adamalang.caravan.contracts.ByteArrayStream
                public void finished() {
                    if (build() == null) {
                        callback.failure(new ErrorCodeException(ErrorCodes.UNIVERSAL_LOOKUP_FAILED, "failed:" + key.space + "/" + key.key));
                    } else {
                        CaravanDataService.this.executor.execute(new NamedRunnable("load-jump-callback", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.7.1
                            @Override // org.adamalang.common.NamedRunnable
                            public void execute() throws Exception {
                                callback.success(this);
                            }
                        });
                    }
                }
            });
        } catch (Exception e) {
            this.LOGGER.error("failed-load-" + key.space + "/" + key.key, (Throwable) e);
            callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_LOAD_FAILURE_EXCEPTION, e));
        }
    }

    private void mergeRestore(Key key, LocalCache localCache, ArrayList<byte[]> arrayList, Callback<Void> callback) {
        ArrayList<byte[]> filter = localCache.filter(arrayList);
        if (filter.size() == 0) {
            callback.success(null);
            return;
        }
        RestoreWalker restoreWalker = new RestoreWalker();
        Iterator<byte[]> it = filter.iterator();
        while (it.hasNext()) {
            EventCodec.route(Unpooled.wrappedBuffer(it.next()), restoreWalker);
        }
        if (this.store.append(key, filter, restoreWalker.seq, restoreWalker.assetBytes, () -> {
            this.executor.execute(new NamedRunnable("restore-write-map", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.8
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Iterator it2 = filter.iterator();
                    while (it2.hasNext()) {
                        EventCodec.route(Unpooled.wrappedBuffer((byte[]) it2.next()), localCache);
                    }
                    callback.success(null);
                }
            });
        }) == null) {
            this.LOGGER.error("out-of-space: caravan unable to merge:" + key.space + "/" + key.key);
            callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_CANT_MERGE_RESTORE_OUT_OF_SPACE));
        }
    }

    public void dumpLog(Key key, Stream<String> stream) {
        this.executor.execute(new AnonymousClass9("dump-log", new String[0], key, stream));
    }

    @Override // org.adamalang.runtime.data.DataService
    public void get(Key key, Callback<LocalDocumentChange> callback) {
        execute(BeanUtil.PREFIX_GETTER_GET, key, true, callback, localCache -> {
            callback.success(localCache.build());
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void initialize(Key key, RemoteDocumentUpdate remoteDocumentUpdate, Callback<Void> callback) {
        Events.Change change = new Events.Change();
        change.copyFrom(remoteDocumentUpdate);
        ByteBuf buffer = Unpooled.buffer();
        EventCodec.write(buffer, change);
        execute("initialize", key, false, callback, localCache -> {
            if (localCache != null || this.store.exists(key)) {
                callback.failure(new ErrorCodeException(ErrorCodes.UNIVERSAL_INITIALIZE_FAILURE));
                return;
            }
            LocalCache localCache = new LocalCache() { // from class: org.adamalang.caravan.CaravanDataService.10
                @Override // org.adamalang.caravan.contracts.ByteArrayStream
                public void finished() {
                }
            };
            localCache.handle(change);
            if (this.store.append(key, ByteArrayHelper.convert(buffer), remoteDocumentUpdate.seqEnd, remoteDocumentUpdate.assetBytes, () -> {
                this.executor.execute(new NamedRunnable("commit-cache", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.11
                    @Override // org.adamalang.common.NamedRunnable
                    public void execute() throws Exception {
                        CaravanDataService.this.addToCacheIfDoesntExistReturnCorrect(key, localCache);
                        callback.success(null);
                    }
                });
            }) == null) {
                this.LOGGER.error("out-of-space: caravan unable to initialize:" + key.space + "/" + key.key);
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_OUT_OF_SPACE_INITIALIZE));
            }
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void patch(Key key, RemoteDocumentUpdate[] remoteDocumentUpdateArr, Callback<Void> callback) {
        Events.Batch batch = new Events.Batch();
        batch.changes = new Events.Change[remoteDocumentUpdateArr.length];
        long j = 0;
        for (int i = 0; i < remoteDocumentUpdateArr.length; i++) {
            batch.changes[i] = new Events.Change();
            batch.changes[i].copyFrom(remoteDocumentUpdateArr[i]);
            j += remoteDocumentUpdateArr[i].assetBytes;
        }
        long j2 = j;
        ByteBuf buffer = Unpooled.buffer();
        EventCodec.write(buffer, batch);
        byte[] convert = ByteArrayHelper.convert(buffer);
        execute("patch", key, true, callback, localCache -> {
            if (!localCache.check(remoteDocumentUpdateArr[0].seqBegin)) {
                this.metrics.caravan_seq_off.run();
                this.LOGGER.error("failure-to-patch-head-patch: given:" + remoteDocumentUpdateArr[0].seqBegin + " have:" + localCache.seq() + " for:" + key.space + "/" + key.key);
                callback.failure(new ErrorCodeException(ErrorCodes.UNIVERSAL_PATCH_FAILURE_HEAD_SEQ_OFF));
            } else if (this.store.append(key, convert, remoteDocumentUpdateArr[remoteDocumentUpdateArr.length - 1].seqEnd, j2, () -> {
                this.executor.execute(new NamedRunnable("patch-commit", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.12
                    @Override // org.adamalang.common.NamedRunnable
                    public void execute() throws Exception {
                        localCache.handle(batch);
                        callback.success(null);
                    }
                });
            }) == null) {
                this.LOGGER.error("out-of-space: caravan unable to patch:" + key.space + "/" + key.key);
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_OUT_OF_SPACE_PATCH));
            }
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void compute(Key key, ComputeMethod computeMethod, int i, Callback<LocalDocumentChange> callback) {
        execute("compute", key, true, callback, localCache -> {
            if (computeMethod == ComputeMethod.HeadPatch) {
                String computeHeadPatch = localCache.computeHeadPatch(i);
                if (computeHeadPatch != null) {
                    callback.success(new LocalDocumentChange(computeHeadPatch, 1, localCache.seq()));
                    return;
                } else {
                    callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_COMPUTE_HEADPATCH_SEQ_NOT_FOUND));
                    return;
                }
            }
            if (computeMethod != ComputeMethod.Rewind) {
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_COMPUTE_METHOD_NOT_FOUND));
                return;
            }
            String computeRewind = localCache.computeRewind(i);
            if (computeRewind != null) {
                callback.success(new LocalDocumentChange(computeRewind, 1, localCache.seq()));
            } else {
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_COMPUTE_REWIND_SEQ_NOT_FOUND));
            }
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void delete(Key key, DeleteTask deleteTask, Callback<Void> callback) {
        execute("delete", key, false, callback, localCache -> {
            this.store.delete(key, () -> {
            });
            this.cache.remove(key);
            deleteTask.executeAfterMark(callback);
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void snapshot(Key key, DocumentSnapshot documentSnapshot, Callback<Integer> callback) {
        if (documentSnapshot.history <= 0) {
            callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_UNABLE_TO_COMPACT_NON_POSITIVE_HISTORY));
            return;
        }
        Events.Snapshot snapshot = new Events.Snapshot();
        snapshot.seq = documentSnapshot.seq;
        snapshot.document = documentSnapshot.json;
        snapshot.history = documentSnapshot.history;
        snapshot.assetBytes = documentSnapshot.assetBytes;
        ByteBuf buffer = Unpooled.buffer();
        EventCodec.write(buffer, snapshot);
        byte[] convert = ByteArrayHelper.convert(buffer);
        execute("snapshot", key, true, callback, localCache -> {
            if (localCache.snapshotInvalid(documentSnapshot.seq)) {
                this.LOGGER.error("failed-snapshot-due-race: snapshot:" + documentSnapshot.seq + " cached:" + localCache.seq() + ";" + key.space + "/" + key.key);
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_REJECT_FUTURE_SNAPSHOT));
            } else if (this.store.append(key, convert, documentSnapshot.seq, documentSnapshot.assetBytes, () -> {
            }) == null) {
                this.LOGGER.error("out-of-space: caravan unable to snapshot:" + key.space + "/" + key.key);
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_OUT_OF_SPACE_SNAPSHOT));
            } else {
                localCache.handle(snapshot);
                this.store.trim(key, Math.max(documentSnapshot.history, localCache.getMinimumHistoryToPreserve()), () -> {
                    callback.success(0);
                });
            }
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void recover(Key key, DocumentRestore documentRestore, Callback<Void> callback) {
        Events.Recover recover = new Events.Recover();
        recover.seq = documentRestore.seq;
        recover.agent = documentRestore.who.agent;
        recover.authority = documentRestore.who.authority;
        recover.document = documentRestore.document;
        ByteBuf buffer = Unpooled.buffer();
        EventCodec.write(buffer, recover);
        byte[] convert = ByteArrayHelper.convert(buffer);
        execute("recover", key, true, callback, localCache -> {
            if (this.store.append(key, convert, documentRestore.seq, 0L, () -> {
            }) == null) {
                this.LOGGER.error("out-of-space: caravan unable to recover document:" + key.space + "/" + key.key);
                callback.failure(new ErrorCodeException(ErrorCodes.CARAVAN_OUT_OF_SPACE_SNAPSHOT));
            } else {
                localCache.handle(recover);
                callback.success(null);
            }
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void shed(Key key) {
        close(key, Callback.DONT_CARE_VOID);
    }

    @Override // org.adamalang.runtime.data.DataService
    public void inventory(final Callback<Set<Key>> callback) {
        this.executor.execute(new NamedRunnable("caravan-inventory", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.13
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                callback.success(new TreeSet(CaravanDataService.this.store.map().keySet()));
            }
        });
    }

    @Override // org.adamalang.runtime.data.DataService
    public void close(Key key, Callback<Void> callback) {
        execute("close", key, false, callback, localCache -> {
            if (localCache != null) {
                this.cache.remove(key);
            }
            callback.success(null);
        });
    }

    public CountDownLatch flush(final boolean z) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.executor.execute(new NamedRunnable("flush", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.14
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                CaravanDataService.this.store.flush(z);
                countDownLatch.countDown();
            }
        });
        return countDownLatch;
    }

    public CountDownLatch shutdown() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.executor.execute(new NamedRunnable("flush", new String[0]) { // from class: org.adamalang.caravan.CaravanDataService.15
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                CaravanDataService.this.store.flush(true);
                CaravanDataService.this.store.shutdown();
                countDownLatch.countDown();
            }
        });
        return countDownLatch;
    }
}
