16#include <zypp-core/zyppng/rpc/MessageStream>
19#include <zypp-media/MediaException>
20#include <zypp-media/auth/CredentialManager>
22#include <zypp/APIConfig.h>
31 return (
_request->code () == ProvideMessage::Code::Attach );
38 return ( _request->code () == ProvideMessage::Code::Prov );
45 return ( _request->code () == ProvideMessage::Code::Detach );
55 DBG <<
"Queue shutdown with Items still running" << std::endl;
64 ERR <<
"Queue Worker was already initialized" << std::endl;
71 MIL <<
"Trying to start " <<
pN << std::endl;
73 if ( !
pi.isExist() ) {
78 if ( !
pi.userMayX() ) {
79 ERR <<
"Failed to start worker for " <<
workerScheme <<
" binary " <<
pi.asString() <<
" is not executable." << std::endl;
110 if (
i.isDetachRequest () )
112 return i._request.get() ==
item;
118 if (
item->code() != ProvideMessage::Code::Attach
119 &&
item->code() != ProvideMessage::Code::Prov ) {
120 ERR <<
"Can not cancel a " <<
item->code() <<
" request!" << std::endl;
126 reqRef->setCurrentQueue(
nullptr);
142 it->_request->setCurrentQueue(
nullptr );
194 reqRef->setCurrentQueue(
nullptr);
203 ERR <<
"Failed to send cancel message to worker" << std::endl;
206 reqRef->setCurrentQueue(
nullptr);
223 if ( !
reqRef->activeUrl() ) {
224 ERR <<
"Item without active URL enqueued, this is a BUG." << std::endl;
231 ERR <<
"Failed to send message to worker process." << std::endl;
244 _idleSince = std::chrono::steady_clock::now();
283 if (
i.isDetachRequest () )
287 if (
reqRef->code() != ProvideMessage::Code::Prov )
292 if (
i.isDetachRequest () )
295 if (
reqRef->code() != ProvideMessage::Code::Prov )
325 ERR <<
"Failed to execute worker" << std::endl;
353 ERR <<
"Failed to send initial message to queue worker" << std::endl;
362 if ( !
caps ||
caps->messagetypename() != WorkerCaps::staticTypeName() ) {
363 ERR <<
"Worker did not sent a capabilities message, aborting" << std::endl;
391 ERR <<
"Ignoring invalid request!" << std::endl;
396 if ( ! elem._request )
398 return exp->requestId() == elem._request->provideMessage().requestId();
402 ERR <<
"Ignoring unknown request ID: " <<
exp->requestId() << std::endl;
412 ERR <<
"Failed to send Error message to worker process." << std::endl;
424 if ( msg->messagetypename() == ProvideMessage::staticTypeName() ) {
437 MIL <<
"Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
445 auto &
reqRef =req._request;
447 const auto code =
provMsg->code();
449 if ( code >= ProvideMessage::Code::FirstInformalCode && code <= ProvideMessage::Code::LastInformalCode ) {
456 }
else if ( code >= ProvideMessage::Code::FirstSuccessCode && code <= ProvideMessage::Code::LastSuccessCode ) {
464 if ( code == ProvideMessage::Code::ProvideFinished ) {
470 std::optional<zypp::ManagedFile> dataRef;
472 if ( !
reqIter->isFileRequest() ) {
473 ERR <<
"Invalid message for request ID: " <<
reqIter->_request->provideMessage().requestId() << std::endl;
484 MIL <<
"CACHE MISS, file " <<
locFName <<
" was already removed, queueing again" << std::endl;
499 reqRef->setCurrentQueue(
nullptr);
511 reqRef->setCurrentQueue(
nullptr);
518 }
else if ( code >= ProvideMessage::Code::FirstClientErrCode && code <= ProvideMessage::Code::LastSrvErrCode ) {
527 reqRef->setCurrentQueue(
nullptr);
536 }
else if ( code >= ProvideMessage::Code::FirstRedirCode && code <= ProvideMessage::Code::LastRedirCode ) {
546 reqRef->setCurrentQueue(
nullptr);
553 }
else if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
555 ERR <<
"Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
559 }
else if ( code >= ProvideMessage::Code::FirstWorkerCode && code <= ProvideMessage::Code::LastWorkerCode ) {
561 if ( code == ProvideMessage::Code::AuthDataRequest ) {
563 ERR <<
"Invalid message for request ID: " <<
reqRef->provideMessage().requestId() << std::endl;
570 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Item was cancelled") )
577 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Request has no owner" ) )
582 if ( !
reqRef->activeUrl() ) {
583 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"Item has no active URL, this is a bug." ) )
591 std::map<std::string, std::string>
extraVals;
599 WAR <<
"Ignoring non string value for " << name << std::endl;
609 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData,
"No auth given by user." ) )
616 ERR <<
"Failed to send AuthorizationInfo to worker process." << std::endl;
629 }
else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
631 if ( !
reqIter->isAttachRequest() ) {
632 ERR <<
"Invalid message for request ID: " <<
reqIter->_request->provideMessage().requestId() << std::endl;
639 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort,
"Item was cancelled" ) )
644 MIL <<
"Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
649 freeDevs.push_back( val.asString() );
652 std::optional<std::string>
desc;
668 MIL <<
"Sending back a MediaChanged message, retrying to find medium " << std::endl;
671 ERR <<
"Failed to send MediaChanged to worker process." << std::endl;
678 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
679 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort,
"Cancelled by User" ) )
684 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
685 if ( !
sendErrorToWorker(
reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip,
"Skipped by User" ) )
692 ERR <<
"Unsupported worker request: "<<code<<
", this is a fatal error!" << std::endl;
699 ERR <<
"Received unsupported message " << msg->messagetypename() <<
" with code " << code <<
" ignoring! " << std::endl;
703 ERR <<
"Received unsupported message " << msg->messagetypename() <<
"ignoring" << std::endl;
716 while ( !
ba.empty() ) {
768 MIL <<
"Unexpected queue worker exit with code: " << exitCode << std::endl;
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
void reset()
Reset to default Ctor values.
Store and operate with byte count.
Base class for Exception.
static LogControl instance()
Singleton access.
Wrapper class for stat/lstat.
const char * c_str() const
String representation.
const std::string & asString() const
String representation.
bool empty() const
Test for an empty path.
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
SignalProxy< void(uint)> sigChannelReadyRead()
SignalProxy< void(int)> sigFinished()
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createCancel(const uint32_t reqId)
static ProvideMessage createMediaChanged(const uint32_t reqId)
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
const std::string queueName(ProvideQueue &q) const
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) _sigMediaChange)
bool isInCache(const zypp::Pathname &downloadedFile) const
void schedule(ScheduleReason reason)
const zypp::Pathname & workerPath() const
RpcMessageStreamPtr _messageStream
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
zypp::ByteCount expectedProvideSize() const
ProvideQueue(ProvidePrivate &parent)
void immediateShutdown(const std::exception_ptr &reason)
std::deque< Item > _waitQueue
void cancel(ProvideRequest *item, std::exception_ptr error)
uint requestCount() const
zypp::Pathname _currentExe
bool canScheduleMore() const
void processReadyRead(int channel)
const std::string & hostname() const
std::optional< TimePoint > _idleSince
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
const Config & workerConfig() const
std::optional< TimePoint > idleSince() const
void procFinished(int exitCode)
uint activeRequests() const
std::list< Item > _activeItems
void forwardToLog(std::string &&logLine)
void enqueue(ProvideRequestRef request)
SignalProxy< void()> sigIdle()
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
static constexpr uint32_t InvalidId
SignalProxy< void()> sigMessageReceived()
static Ptr create(IODevice::Ptr iostr)
const std::string & worker_name() const
WorkerType worker_type() const
int unlink(const Pathname &path)
Like 'unlink'.
int assert_dir(const Pathname &path, unsigned mode)
Like 'mkdir -p'.
constexpr std::string_view EffectiveUrl("effective_url")
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
constexpr std::string_view LocalFilename("local_filename")
constexpr std::string_view CacheHit("cacheHit")
constexpr std::string_view ExpectedFilesize("expected_filesize")
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
bool provideDebugEnabled()
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
bool isDetachRequest() const
ProvideRequestRef _request
bool isAttachRequest() const
bool isFileRequest() const
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.