.. _program_listing_file_src_translator_service.cpp: Program Listing for File service.cpp ==================================== |exhale_lsh| :ref:`Return to documentation for file ` (``src/translator/service.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "service.h" #include #include #include "batch.h" #include "byte_array_util.h" #include "definitions.h" namespace marian { namespace bergamot { namespace { // Combines two responses with first.target == second.source mapping alignments etc accordingly. // There are several constraints which are matched by only the pivoting workflow in <>Service source, therefore this // function is not for external use and in a hidden namespace. Response combine(Response &&first, Response &&second) { Response combined; // Compute alignment first using internal matrices and mappings. if (first.alignments.size()) { combined.alignments = remapAlignments(first, second); } combined.source = std::move(first.source); combined.target = std::move(second.target); combined.qualityScores = std::move(second.qualityScores); return combined; } std::optional makeOptionalCache(size_t size, size_t mutexBuckets) { return size > 0 ? std::make_optional(size, mutexBuckets) : std::nullopt; } } // namespace BlockingService::BlockingService(const BlockingService::Config &config) : config_(config), requestId_(0), batchingPool_(), cache_(makeOptionalCache(config.cacheSize, /*mutexBuckets = */ 1)), logger_(config.logger) {} std::vector BlockingService::translateMultiple(std::shared_ptr translationModel, std::vector &&sources, const std::vector &responseOptions) { std::vector htmls; for (size_t i = 0; i < sources.size(); i++) { htmls.emplace_back(std::move(sources[i]), responseOptions[i].HTML); } std::vector responses = translateMultipleRaw(translationModel, std::move(sources), responseOptions); for (size_t i = 0; i < responses.size(); i++) { htmls[i].restore(responses[i]); } return responses; } std::vector BlockingService::translateMultipleRaw(std::shared_ptr translationModel, std::vector &&sources, const std::vector &responseOptions) { std::vector responses; responses.resize(sources.size()); for (size_t i = 0; i < sources.size(); i++) { auto callback = [i, &responses](Response &&response) { responses[i] = std::move(response); }; // Ptr request = translationModel->makeRequest(requestId_++, std::move(sources[i]), callback, responseOptions[i], cache_); batchingPool_.enqueueRequest(translationModel, request); } Batch batch; Ptr model{nullptr}; while (batchingPool_.generateBatch(model, batch)) { model->translateBatch(/*deviceId=*/0, batch); } return responses; } std::vector BlockingService::pivotMultiple(std::shared_ptr first, std::shared_ptr second, std::vector &&sources, const std::vector &responseOptions) { std::vector htmls; for (size_t i = 0; i < sources.size(); i++) { htmls.emplace_back(std::move(sources[i]), responseOptions[i].HTML); } // Translate source to pivots. This is same as calling translateMultiple. std::vector sourcesToPivots; sourcesToPivots = translateMultipleRaw(first, std::move(sources), responseOptions); // Translate pivots to targets, after we have outputs at pivot from first round. We cannot use translateMultiple here // because need consistency at pivot on both sides. std::vector pivotsToTargets; pivotsToTargets.resize(sourcesToPivots.size()); for (size_t i = 0; i < sourcesToPivots.size(); i++) { AnnotatedText intermediate = sourcesToPivots[i].target; // We cannot eliminate this copy, as we need two versions of intermediate. Holding // it in allows further use in makePivotRequest auto callback = [i, &pivotsToTargets](Response &&response) { pivotsToTargets[i] = std::move(response); }; // Ptr request = second->makePivotRequest(requestId_++, std::move(intermediate), callback, responseOptions[i], cache_); batchingPool_.enqueueRequest(second, request); } Batch batch; Ptr model{nullptr}; while (batchingPool_.generateBatch(model, batch)) { model->translateBatch(/*deviceId=*/0, batch); } // Combine both sides. They're associated by indices. std::vector finalResponses; for (size_t i = 0; i < sourcesToPivots.size(); i++) { Response finalResponse = combine(std::move(sourcesToPivots[i]), std::move(pivotsToTargets[i])); finalResponses.push_back(std::move(finalResponse)); } for (size_t i = 0; i < finalResponses.size(); i++) { htmls[i].restore(finalResponses[i]); } return finalResponses; } AsyncService::AsyncService(const AsyncService::Config &config) : requestId_(0), config_(config), safeBatchingPool_(), cache_(makeOptionalCache(config_.cacheSize, /*mutexBuckets=*/config_.numWorkers)), logger_(config.logger) { ABORT_IF(config_.numWorkers == 0, "Number of workers should be at least 1 in a threaded workflow"); workers_.reserve(config_.numWorkers); for (size_t cpuId = 0; cpuId < config_.numWorkers; cpuId++) { workers_.emplace_back([cpuId, this] { // Consumer thread main-loop. Note that this is an infinite-loop unless the monitor is explicitly told to // shutdown, which happens in the destructor for this class. Batch batch; Ptr translationModel{nullptr}; while (safeBatchingPool_.generateBatch(translationModel, batch)) { translationModel->translateBatch(cpuId, batch); } }); } } void AsyncService::clear() { safeBatchingPool_.clear(); } AsyncService::~AsyncService() { safeBatchingPool_.shutdown(); for (std::thread &worker : workers_) { assert(worker.joinable()); worker.join(); } workers_.clear(); } void AsyncService::pivot(std::shared_ptr first, std::shared_ptr second, std::string &&source, CallbackType clientCallback, const ResponseOptions &responseOptions) { Ptr html = std::make_shared(std::move(source), responseOptions.HTML); // This is callback chaining or CPS due to async. // We create a callback which feeds the result of first into a second translation (internalCallback), which is // supplied with a callback (joiningCallback) which merges both results and creates our final response. // auto internalCallback = [this, clientCallback, second, responseOptions, html](Response &&sourceToPivot) { // We cannot eliminate the following copy, as we need two versions of intermediate. Holding // it in a copy allows moving the response into the lambda below. AnnotatedText intermediate = sourceToPivot.target; // https://stackoverflow.com/a/65606554/4565794 // Move semantics only work on mutable lambdas, and can only be done once. It's only once in our case, so issok. auto joiningCallback = [this, sourceToPivot = std::move(sourceToPivot), clientCallback, html](Response &&pivotToTarget) mutable { // We have both Responses at this callback, sourceToPivot is moved in, second half will be available when // complete. Response finalResponse = combine(std::move(sourceToPivot), std::move(pivotToTarget)); // Sentences should be consistent now, give way to client. html->restore(finalResponse); clientCallback(std::move(finalResponse)); }; // Second call. Ptr request = second->makePivotRequest(requestId_++, std::move(intermediate), joiningCallback, responseOptions, cache_); safeBatchingPool_.enqueueRequest(second, request); }; // First call. translateRaw(first, std::move(source), internalCallback, responseOptions); } void AsyncService::translate(std::shared_ptr translationModel, std::string &&source, CallbackType callback, const ResponseOptions &responseOptions) { // Producer thread, a call to this function adds new work items. If batches are available, notifies workers waiting. Ptr html = std::make_shared(std::move(source), responseOptions.HTML); auto internalCallback = [html, callback](Response &&response) { html->restore(response); callback(std::move(response)); }; translateRaw(translationModel, std::move(source), internalCallback, responseOptions); } void AsyncService::translateRaw(std::shared_ptr translationModel, std::string &&source, CallbackType callback, const ResponseOptions &responseOptions) { // Producer thread, a call to this function adds new work items. If batches are available, notifies workers waiting. Ptr request = translationModel->makeRequest(requestId_++, std::move(source), callback, responseOptions, cache_); safeBatchingPool_.enqueueRequest(translationModel, request); } } // namespace bergamot } // namespace marian