11#include <zypp-core/base/DtorReset>
15#include <zypp-core/zyppng/pipelines/AsyncResult>
20#include <zypp-core/zyppng/base/AutoDisconnect>
21#include <zypp-core/zyppng/base/EventDispatcher>
22#include <zypp-media/MediaConfig>
28#undef ZYPP_BASE_LOGGER_LOGGROUP
29#define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
81 if ( !
_controlIO->openFds( { recv }, send ) ) {
123 return std::make_shared<ProvideWorkerItem>( std::move(spec) );
133 ERR <<
"Failed to send ProvideStart message" << std::endl;
139 MIL_PRV <<
"Sending provideSuccess for id " <<
id <<
" file " <<
localFile << std::endl;
141 for (
auto i =
extra.beginList ();
i !=
extra.endList();
i++ ) {
142 for (
const auto &val :
i->second )
143 msg.addValue(
i->first, val );
145 if ( !
_stream->sendMessage( msg ) ) {
146 ERR <<
"Failed to send ProvideSuccess message" << std::endl;
152 MIL_PRV <<
"Sending provideFailed for request " <<
id <<
" err: " <<
reason << std::endl;
154 for (
auto i =
extra.beginList ();
i !=
extra.endList();
i++ ) {
155 for (
const auto &val :
i->second )
156 msg.addValue(
i->first, val );
158 if ( !
_stream->sendMessage( msg ) ) {
159 ERR <<
"Failed to send ProvideFailed message" << std::endl;
167 if ( !
e.historyEmpty() ) {
180 MIL_PRV <<
"Sending attachSuccess for request " <<
id << std::endl;
182 ERR <<
"Failed to send AttachFinished message" << std::endl;
184 MIL <<
"Sent back attach success" << std::endl;
190 MIL_PRV <<
"Sending detachSuccess for request " <<
id << std::endl;
192 ERR <<
"Failed to send DetachFinished message" << std::endl;
210 const auto &msg =
_stream->nextMessageWait() | [&](
auto &&nextMessage ) {
211 if ( !nextMessage ) {
223 ERR <<
"Failed to receive message" << std::endl;
232 MIL <<
"Remembering message for later: " << msg->code () << std::endl;
244 MIL <<
"Failed to wait for message, aborting the request " << std::endl;
248 if (
m->code() == ProvideMessage::Code::MediaChanged )
250 else if (
m->code() == ProvideMessage::Code::MediaChangeSkip )
261 if (
m.code() == ProvideMessage::Code::AuthInfo ) {
264 m.forEachVal( [&]( const std::string &name, const ProvideMessage::FieldVal &val ) {
265 if ( name == AuthInfoMsgFields::Username ) {
266 inf.username = val.asString();
267 } else if ( name == AuthInfoMsgFields::Password ) {
268 inf.password = val.asString();
269 } else if ( name == AuthInfoMsgFields::AuthTimestamp ) {
270 inf.last_auth_timestamp = val.asInt64();
272 if ( !val.isString() ) {
273 ERR <<
"Ignoring invalid extra value, " << name <<
" is not of type string" << std::endl;
275 inf.extraKeys[name] = val.asString();
288 return *_controlIO.get();
293 const auto &
helo = _stream->nextMessageWait();
295 ERR <<
"Could not receive a handshake message, aborting" << std::endl;
301 invalidMessageReceived(
exp.error() );
305 return std::move(*
exp) | [&](
auto &&
conf ) {
307 _workerConf = std::move(
conf);
310 for(
const auto &[key,value] : _workerConf ) {
312 if (
keyUrl.getScheme() ==
"zconfig" &&
keyUrl.getAuthority() ==
"main" ) {
319 caps.set_worker_name( _workerName.data() );
322 if ( !_stream->sendMessage (
caps ) ) {
330 void ProvideWorker::messageLoop(
Timer & )
335 while ( _pendingMessages.size () ) {
336 auto m = _pendingMessages.front ();
337 _pendingMessages.pop_front ();
338 handleSingleMessage(
m);
341 if ( !_fatalError && _pendingProvides.size() ) {
346 if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
352 void ProvideWorker::maybeDelayedShutdown()
354 if ( _inControllerRequest ) {
355 _delayedShutdown->start(0);
365 MIL <<
"Read FD closed, exiting." << std::endl;
366 maybeDelayedShutdown();
371 MIL <<
"Write FD closed, exiting." << std::endl;
372 maybeDelayedShutdown();
375 void ProvideWorker::messageReceived()
377 while (
auto message = _stream->nextMessage() ) {
380 pushSingleMessage(*message);
384 void ProvideWorker::onInvalidMessageReceived()
386 invalidMessageReceived( std::exception_ptr() );
389 void ProvideWorker::invalidMessageReceived( std::exception_ptr
p )
391 ERR <<
"Received a invalid message on the input stream, aborting" << std::endl;
402 const auto code = provide.code();
404 if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
406 MIL_PRV <<
"Received request: " << code << std::endl;
408 if ( code == ProvideMessage::Code::Cancel ) {
409 const auto &
i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [
id = provide.requestId() ](
const auto &
it ){ return it->_spec.requestId() == id; } );
410 if (
i != _pendingProvides.end() ) {
411 switch ( (*i)->_state ) {
412 case ProvideWorkerItem::Pending:
414 _pendingProvides.erase(
i);
416 case ProvideWorkerItem::Running:
419 case ProvideWorkerItem::Finished:
422 MIL <<
"Received Cancel for unknown request: " << provide.requestId() <<
", ignoring!" << std::endl;
430 ERR <<
"Unsupported request with code: " << code <<
" received!" << std::endl;
433 void ProvideWorker::pushSingleMessage(
const RpcMessage &message )
435 const auto &handle = [&](
const RpcMessage &message ){
437 if (
msgTypeName == ProvideMessage::staticTypeName() ) {
438 return parseReceivedMessage( message )
440 _pendingMessages.push_back(provide);
448 const auto &
exp = handle( message );
451 std::rethrow_exception (
exp.error () );
453 ERR <<
"Catched exception during message handling: " <<
e << std::endl;
454 }
catch (
const std::exception &
e ) {
455 ERR <<
"Catched exception during message handling: " <<
e.what()<< std::endl;
457 ERR <<
"Unknown Exception during message handling" << std::endl;
466 invalidMessageReceived(
exp.error() );
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Assign a vaiable a certain value when going out of scope.
Base class for Exception.
static LogControl instance()
Singleton access.
std::string basename() const
Return the last component of this path.
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
static ProvideMessage createDetachFinished(const uint32_t reqId)
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
static ProvideMessage createAttachFinished(const uint32_t reqId, const std::optional< std::string > &localMountPoint={})
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
SignalProxy< void()> sigMessageReceived()
std::shared_ptr< RpcMessageStream > Ptr
SignalProxy< void()> sigInvalidMessageReceived()
static Ptr create(IODevice::Ptr iostr)
const std::string & messagetypename() const
The Timer class provides repetitive and single-shot timers.
SignalProxy< void(Timer &t) sigExpired)()
This signal is always emitted when the timer expires.
static expected success(ConsParams &&...params)
void detachSuccess(const uint32_t id)
std::deque< ProvideWorkerItemRef > _pendingProvides
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
void messageLoop(Timer &)
void attachSuccess(const uint32_t id, const std::optional< std::string > &localMountPoint={})
ProvideWorker(std::string_view workerName)
expected< void > executeHandshake()
AsyncDataSource::Ptr _controlIO
void onInvalidMessageReceived()
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
std::deque< ProvideMessage > _pendingMessages
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
Timer::Ptr _delayedShutdown
void maybeDelayedShutdown()
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
RpcMessageStream::Ptr messageStream() const
RpcMessageStream::Ptr _stream
ProvideNotificatioMode provNotificationMode() const
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
void provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
std::deque< ProvideWorkerItemRef > & requestQueue()
ProvideNotificatioMode _provNotificationMode
expected< ProvideMessage > parseReceivedMessage(const RpcMessage &m)
bool _inControllerRequest
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
std::exception_ptr _fatalError
Easy-to use interface to the ZYPP dependency resolver.
constexpr std::string_view History("history")
auto and_then(Fun &&function)
ResultType and_then(const expected< T, E > &exp, Function &&f)
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
static ThreadData & current()
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.