Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1// @HEADER
2// ***********************************************************************
3//
4// Tpetra: Templated Linear Algebra Services Package
5// Copyright (2008) Sandia Corporation
6//
7// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8// the U.S. Government retains certain rights in this software.
9//
10// Redistribution and use in source and binary forms, with or without
11// modification, are permitted provided that the following conditions are
12// met:
13//
14// 1. Redistributions of source code must retain the above copyright
15// notice, this list of conditions and the following disclaimer.
16//
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// 3. Neither the name of the Corporation nor the names of the
22// contributors may be used to endorse or promote products derived from
23// this software without specific prior written permission.
24//
25// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36//
37// Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38//
39// ************************************************************************
40// @HEADER
41
42#ifndef TPETRA_DETAILS_READTRIPLES_HPP
43#define TPETRA_DETAILS_READTRIPLES_HPP
44
53
54#include "TpetraCore_config.h"
55#include "Tpetra_Details_PackTriples.hpp"
56#include "Kokkos_ArithTraits.hpp"
57#include "Teuchos_MatrixMarket_generic.hpp"
58#include "Teuchos_CommHelpers.hpp"
59#include <iostream>
60#include <typeinfo> // for debugging
61
62namespace Tpetra {
63namespace Details {
64
65//
66// Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
67// interface. I put "public" in quotes because it's public only for
68// Tpetra developers, NOT for Tpetra users.
69//
70
71namespace Impl {
72
73// mfh 01 Feb 2017: Unfortunately,
74// Teuchos::MatrixMarket::readComplexData requires Teuchos to have
75// complex arithmetic support enabled. To avoid this issue, I
76// reimplement the function here. It's not very long.
77
110template<class OrdinalType, class RealType>
111bool
112readComplexData (std::istream& istr,
117 const std::size_t lineNumber,
118 const bool tolerant)
119{
120 using ::Teuchos::MatrixMarket::readRealData;
121
124 if (tolerant) {
125 return false;
126 }
127 else {
128 std::ostringstream os;
129 os << "Failed to read pattern data and/or real value from line "
130 << lineNumber << " of input";
131 throw std::invalid_argument(os.str());
132 }
133 }
134 if (istr.eof ()) {
135 if (tolerant) {
136 return false;
137 }
138 else {
139 std::ostringstream os;
140 os << "No more data after real value on line "
141 << lineNumber << " of input";
142 throw std::invalid_argument (os.str ());
143 }
144 }
146 if (istr.fail ()) {
147 if (tolerant) {
148 return false;
149 }
150 else {
151 std::ostringstream os;
152 os << "Failed to get imaginary value from line "
153 << lineNumber << " of input";
154 throw std::invalid_argument (os.str ());
155 }
156 }
159 return true;
160}
161
162
172template<class SC,
173 class GO,
174 const bool isComplex = ::Kokkos::Details::ArithTraits<SC>::is_complex>
175struct ReadLine {
196 static int
197 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
198 const std::string& line,
199 const std::size_t lineNumber,
200 const bool tolerant = false,
201 std::ostream* errStrm = NULL,
202 const bool debug = false);
203};
204
212template<class SC, class GO>
213struct ReadLine<SC, GO, true> {
234 static int
235 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
236 const std::string& line,
237 const std::size_t lineNumber,
238 const bool tolerant = false,
239 std::ostream* errStrm = NULL,
240 const bool debug = false)
241 {
242 using ::Teuchos::MatrixMarket::checkCommentLine;
243 typedef typename ::Kokkos::Details::ArithTraits<SC>::mag_type real_type;
244 using std::endl;
245
246 GO rowInd, colInd;
248 std::istringstream istr (line);
249 bool success = true;
250 try {
251 // Use the version of this function in this file, not the
252 // version in Teuchos_MatrixMarket_generic.hpp, because the
253 // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
256 }
257 catch (std::exception& e) {
258 success = false;
259 if (errStrm != NULL) {
260 std::ostringstream os;
261 os << "readLine: readComplexData threw an exception: " << e.what ()
262 << endl;
263 *errStrm << os.str ();
264 }
265 }
266
267 if (success) {
268 // if (debug && errStrm != NULL) {
269 // std::ostringstream os;
270 // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
271 // << ", realPart=" << realPart << ", imagPart=" << imagPart
272 // << std::endl;
273 // *errStrm << os.str ();
274 // }
275 // This line may have side effects.
276 const int errCode =
278 if (errCode != 0 && errStrm != NULL) {
279 std::ostringstream os;
280 os << "readLine: processTriple returned " << errCode << " != 0."
281 << endl;
282 *errStrm << os.str ();
283 }
284 return errCode;
285 }
286 else {
287 return -1;
288 }
289 }
290};
291
299template<class SC, class GO>
300struct ReadLine<SC, GO, false> {
321 static int
322 readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
323 const std::string& line,
324 const std::size_t lineNumber,
325 const bool tolerant = false,
326 std::ostream* errStrm = NULL,
327 const bool debug = false)
328 {
329 using ::Teuchos::MatrixMarket::checkCommentLine;
330 using ::Teuchos::MatrixMarket::readRealData;
331 using std::endl;
332
333 GO rowInd, colInd;
334 SC val;
335 std::istringstream istr (line);
336 bool success = true;
337 try {
340 }
341 catch (std::exception& e) {
342 success = false;
343 if (errStrm != NULL) {
344 std::ostringstream os;
345 os << "readLine: readRealData threw an exception: " << e.what ()
346 << endl;
347 *errStrm << os.str ();
348 }
349 }
350
351 if (success) {
352 if (debug && errStrm != NULL) {
353 std::ostringstream os;
354 os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
355 << ", val=" << val << std::endl;
356 *errStrm << os.str ();
357 }
358 // This line may have side effects.
359 const int errCode = processTriple (rowInd, colInd, val);
360 if (errCode != 0 && errStrm != NULL) {
361 std::ostringstream os;
362 os << "readLine: processTriple returned " << errCode << " != 0."
363 << endl;
364 *errStrm << os.str ();
365 }
366 return errCode;
367 }
368 else {
369 return -1;
370 }
371 }
372};
373
399template<class SC, class GO>
400int
401readLine (std::function<int (const GO, const GO, const SC&)> processTriple,
402 const std::string& line,
403 const std::size_t lineNumber,
404 const bool tolerant = false,
405 std::ostream* errStrm = NULL,
406 const bool debug = false)
407{
409 tolerant, errStrm, debug);
410}
411
442template<class SC, class GO>
443int
445 std::size_t& curLineNum,
446 std::size_t& numTriplesRead,
447 std::function<int (const GO, const GO, const SC&)> processTriple,
448 const std::size_t maxNumTriplesToRead,
449 const bool tolerant = false,
450 std::ostream* errStrm = NULL,
451 const bool debug = false)
452{
453 using Teuchos::MatrixMarket::checkCommentLine;
454 using std::endl;
455 using std::size_t;
456
457 numTriplesRead = 0; // output argument only
458 if (inputStream.eof ()) {
459 return 0; // no error, just nothing left to read
460 }
461 else if (inputStream.fail ()) {
462 if (errStrm != NULL) {
463 *errStrm << "Input stream reports a failure (not the same as "
464 "end-of-file)." << endl;
465 }
466 return -1;
467 }
468
469 std::string line;
470 std::vector<size_t> badLineNumbers;
471 int errCode = 0; // 0 means success
472
473 bool inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
474 ++curLineNum; // we read the line; we can't put it back
476 // if (debug && errStrm != NULL) {
477 // std::ostringstream os;
478 // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
479 // *errStrm << os.str ();
480 // }
481 size_t start, size;
482
483 const bool isCommentLine =
485 if (isCommentLine) {
486 // Move on to the next line, if there is a next line.
487 inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
488 ++curLineNum; // we read another line; we can't put it back
489 continue; // move on to the next line
490 }
491 else { // not a comment line; should have a sparse matrix entry
492 const std::string theLine = line.substr (start, size);
493 // If the line has a valid sparse matrix entry, extract it and
494 // hand it off to the processTriple closure.
495 const int curErrCode =
497 if (curErrCode != 0) {
499 badLineNumbers.push_back (curLineNum);
500 }
501 else {
503 }
505 inputStreamCanStillBeRead = std::getline (inputStream, line).good ();
506 }
507 }
508 } // while there are lines to read and we need more triples
509
510 if (errCode != 0 && errStrm != NULL) {
511 const size_t numBadLines = badLineNumbers.size ();
512 *errStrm << "Encountered " << numBadLines << " bad line"
513 << (numBadLines != size_t (1) ? "s" : "")
514 << ": [";
515 for (size_t k = 0; k < numBadLines; ++k) {
517 if (k + 1 < numBadLines) {
518 *errStrm << ", ";
519 }
520 }
521 *errStrm << "]" << endl;
522 }
523 if (! inputStream.eof () && inputStream.fail ()) {
524 if (errCode == 0) {
525 errCode = -1;
526 }
527 if (errStrm != NULL) {
528 *errStrm << "The input stream is not at end-of-file, "
529 "but is in a bad state." << endl;
530 }
531 }
532 return errCode;
533}
534
560template<class SC, class GO>
561int
563 std::size_t& curLineNum,
564 std::size_t& numEntRead,
565 ::Teuchos::ArrayRCP<int>& sizeBuf,
566 ::Teuchos::ArrayRCP<char>& msgBuf,
567 std::vector<GO>& rowInds,
568 std::vector<GO>& colInds,
569 std::vector<SC>& vals,
570 const std::size_t maxNumEntPerMsg,
571 const int destRank,
572 const ::Teuchos::Comm<int>& comm,
573 const bool tolerant = false,
574 std::ostream* errStrm = NULL,
575 const bool debug = false)
576{
577 using ::Tpetra::Details::countPackTriplesCount;
578 using ::Tpetra::Details::countPackTriples;
579 using ::Tpetra::Details::packTriplesCount;
580 using ::Tpetra::Details::packTriples;
581 using ::Teuchos::isend;
582 using std::endl;
583
584 using ::Kokkos::ArithTraits;
585 // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
586 // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
587 constexpr int sizeTag = 42;
588 constexpr int msgTag = 43;
589 //constexpr int srcRank = 0;
590 int errCode = 0;
591
592 // This doesn't actually deallocate memory; it just changes the size
593 // back to zero, so that push_back starts over from the beginning.
594 rowInds.resize (0);
595 colInds.resize (0);
596 vals.resize (0);
597 // Closure that adds the new matrix entry to the above temp arrays.
598 auto processTriple = [&rowInds, &colInds, &vals]
599 (const GO rowInd, const GO colInd, const SC& val) {
600 try {
601 rowInds.push_back (rowInd);
602 colInds.push_back (colInd);
603 vals.push_back (val);
604 }
605 catch (...) {
606 return -1;
607 }
608 return 0;
609 };
610 numEntRead = 0; // output argument
613 errStrm, debug);
614 if (debug && errStrm != NULL) {
615 std::ostringstream os;
616 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
617 << ", GO=" << typeid (GO).name () << ": "
618 << "readAndSendOneBatchOfTriples: readTriples read "
619 << numEntRead << " matrix entries, and returned errCode="
620 << errCode << "." << std::endl;
621 *errStrm << os.str ();
622 }
623 if (numEntRead != rowInds.size () ||
624 numEntRead != colInds.size () ||
625 numEntRead != vals.size ()) {
626 if (errStrm != NULL) {
627 *errStrm << "readTriples size results are not consistent. "
628 << "numEntRead = " << numEntRead
629 << ", rowInds.size() = " << rowInds.size ()
630 << ", colInds.size() = " << colInds.size ()
631 << ", and vals.size() = " << vals.size () << "."
632 << std::endl;
633 }
634 if (errCode == 0) {
635 errCode = -1;
636 }
637 }
638
639 // We don't consider reading having "failed" if we've reached
640 // end-of-file before reading maxNumEntPerMsg entries. It's OK if
641 // we got fewer triples than that. Furthermore, we have to send at
642 // least one message to the destination process, even if the read
643 // from the file failed.
644
645 if (numEntRead == 0 || errCode != 0) {
646 // Send a message size of zero to the receiving process, to tell
647 // it that we have no triples to send, or that there was an error
648 // reading. The latter just means that we "go through the
649 // motions," then broadcast the error code.
650 sizeBuf[0] = 0;
651 if (debug && errStrm != NULL) {
652 std::ostringstream os;
653 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
654 << ", GO=" << typeid (GO).name () << ": "
655 << "Post send (size=0, errCode=" << errCode << ") "
656 << "to " << destRank << " with tag " << sizeTag << endl;
657 *errStrm << os.str ();
658 }
659 send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
660 return errCode;
661 }
662 else { // we read a nonzero # of triples, without error
663 const int numEnt = static_cast<int> (numEntRead);
664 int countSize = 0; // output argument
665 int triplesSize = 0; // output argument
666
668 // countSize should never be nonpositive, since we have to pack an
669 // integer size.
670 if (countSize <= 0 && errCode == 0) {
671 errCode = -1;
672 }
673
674 if (errCode != 0) {
675 // Send zero to the receiving process, to tell it about the error.
676 sizeBuf[0] = 0;
677 if (debug && errStrm != NULL) {
678 std::ostringstream os;
679 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
680 << ", GO=" << typeid (GO).name () << ": "
681 << "Post send (size=0, error case) to " << destRank
682 << " with tag " << sizeTag << endl;
683 *errStrm << os.str ();
684 }
685 send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
686 return errCode;
687 }
688 else { // countPackTriplesCount succeeded
690 if (errCode != 0) {
691 // Send a message size of zero to the receiving process, to
692 // tell it that there was an error counting.
693 sizeBuf[0] = 0;
694 if (debug && errStrm != NULL) {
695 std::ostringstream os;
696 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
697 << ", GO=" << typeid (GO).name () << ": "
698 << "Post send (size=0, error case) to " << destRank
699 << " with tag " << sizeTag << endl;
700 *errStrm << os.str ();
701 }
702 send (sizeBuf.getRawPtr (), 1, destRank, sizeTag, comm);
703 return errCode;
704 }
705 else { // countPackTriples succeeded; message packed & ready to send
706 // Send the message size (in bytes). We can use a nonblocking
707 // send here, and try to overlap with message packing.
708 const int outBufSize = countSize + triplesSize;
709 sizeBuf[0] = outBufSize;
710 if (debug && errStrm != NULL) {
711 std::ostringstream os;
712 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
713 << ", GO=" << typeid (GO).name () << ": "
714 << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
715 << " with tag " << sizeTag << endl;
716 *errStrm << os.str ();
717 }
719
720 msgBuf.resize (outBufSize);
721 char* outBuf = msgBuf.getRawPtr ();
722
723 // If anything goes wrong with packing, send the pack buffer
724 // anyway, since the receiving process expects a message.
725 int outBufCurPos = 0; // input/output argument
727 outBufCurPos, comm, errStrm);
728 if (errCode == 0) {
729 errCode = packTriples<SC, GO> (rowInds.data (), colInds.data (),
730 vals.data (), numEnt, outBuf,
732 errStrm);
733 }
734 if (debug && errStrm != NULL) {
735 std::ostringstream os;
736 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
737 << ", GO=" << typeid (GO).name () << ": "
738 << "Post isend (packed data) to " << destRank
739 << " with tag " << msgTag << endl;
740 *errStrm << os.str ();
741 }
743
744 // Wait on the two messages. It doesn't matter in what order
745 // we send them, because they have different tags. The
746 // receiving process will wait on the first message first, in
747 // order to get the size of the second message.
748 if (debug && errStrm != NULL) {
749 std::ostringstream os;
750 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
751 << ", GO=" << typeid (GO).name () << ": "
752 << "Wait on isend (size)" << endl;
753 *errStrm << os.str ();
754 }
755 sizeReq->wait ();
756 if (debug && errStrm != NULL) {
757 std::ostringstream os;
758 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
759 << ", GO=" << typeid (GO).name () << ": "
760 << "Wait on isend (packed data)" << endl;
761 *errStrm << os.str ();
762 }
763 msgReq->wait ();
764
765 // This doesn't actually deallocate; it just resets sizes to zero.
766 rowInds.clear ();
767 colInds.clear ();
768 vals.clear ();
769 }
770 }
771 }
772 return errCode;
773}
774
782
810template<class SC, class GO, class CommRequestPtr>
811int
813 std::vector<GO>& colInds,
814 std::vector<SC>& vals,
815 int& numEnt,
816 ::Teuchos::ArrayRCP<int>& sizeBuf,
817 ::Teuchos::ArrayRCP<char>& msgBuf,
819 const int srcRank,
820 const ::Teuchos::Comm<int>& comm,
821 const bool tolerant = false,
822 std::ostream* errStrm = NULL,
823 const bool debug = false)
824{
825 using ::Tpetra::Details::unpackTriplesCount;
826 using ::Tpetra::Details::unpackTriples;
827 using ::Kokkos::ArithTraits;
828
830 //constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
831 //constexpr int sizeTag = 42;
832 constexpr int msgTag = 43;
833 int errCode = 0; // return value
834 numEnt = 0; // output argument
835
836 // Wait on the ireceive we preposted before calling this function.
837 if (debug && errStrm != NULL) {
838 std::ostringstream os;
839 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
840 << ", GO=" << typeid (GO).name () << ": "
841 << "Wait on irecv (size)" << std::endl;
842 *errStrm << os.str ();
843 }
844 sizeReq->wait ();
846 const int inBufSize = sizeBuf[0];
847 if (debug && errStrm != NULL) {
848 std::ostringstream os;
849 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
850 << ", GO=" << typeid (GO).name () << ": "
851 << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
852 *errStrm << os.str ();
853 }
854
855 if (inBufSize == 0) {
856 numEnt = 0;
857 rowInds.resize (0);
858 colInds.resize (0);
859 vals.resize (0);
860 }
861 else {
862 msgBuf.resize (inBufSize);
863 char* inBuf = msgBuf.getRawPtr ();
864
865 if (debug && errStrm != NULL) {
866 std::ostringstream os;
867 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
868 << ", GO=" << typeid (GO).name () << ": "
869 << "Post irecv (packed data) " << "from " << srcRank
870 << " with tag " << msgTag << std::endl;
871 *errStrm << os.str ();
872 }
873 auto msgReq = ::Teuchos::ireceive (msgBuf, srcRank, msgTag, comm);
874 if (debug && errStrm != NULL) {
875 std::ostringstream os;
876 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
877 << ", GO=" << typeid (GO).name () << ": "
878 << "Wait on irecv (packed data)" << std::endl;
879 *errStrm << os.str ();
880 }
881 msgReq->wait ();
882
883 int inBufCurPos = 0; // output argument
885 numEnt, comm, errStrm);
886 if (errCode == 0) {
887 rowInds.resize (numEnt);
888 colInds.resize (numEnt);
889 vals.resize (numEnt);
891 rowInds.data (), colInds.data (),
892 vals.data (), numEnt, comm, errStrm);
893 }
894 }
895 return errCode;
896}
897
898} // namespace Impl
899
900//
901// SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
902//
903
937template<class SC, class GO>
938int
939readAndDealOutTriples (std::istream& inputStream, // only valid on Proc 0
940 std::size_t& curLineNum, // only valid on Proc 0
941 std::size_t& totalNumEntRead, // only valid on Proc 0
942 std::function<int (const GO, const GO, const SC&)> processTriple,
943 const std::size_t maxNumEntPerMsg,
944 const ::Teuchos::Comm<int>& comm,
945 const bool tolerant = false,
946 std::ostream* errStrm = NULL,
947 const bool debug = false)
948{
949 using Kokkos::ArithTraits;
950 using std::endl;
951 using std::size_t;
952
953 constexpr int srcRank = 0;
954 //constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
956 constexpr int sizeTag = 42;
957 //constexpr int msgTag = 43;
958 const int myRank = comm.getRank ();
959 const int numProcs = comm.getSize ();
960 int errCode = 0;
961
962 ::Teuchos::ArrayRCP<int> sizeBuf (1);
963 ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
964
965 // Temporary storage for reading & packing (on Process srcRank) or
966 // unpacking (every other process) triples.
967 std::vector<GO> rowInds;
968 std::vector<GO> colInds;
969 std::vector<SC> vals;
970 rowInds.reserve (maxNumEntPerMsg);
971 colInds.reserve (maxNumEntPerMsg);
972 vals.reserve (maxNumEntPerMsg);
973
974 totalNumEntRead = 0;
975 if (myRank == srcRank) {
976 // Loop around through all the processes, including this one, over
977 // and over until we reach the end of the file, or an error occurs.
978 int destRank = 0;
979 bool lastMessageWasLegitZero = false;
980 for ( ;
981 ! inputStream.eof () && errCode == 0;
982 destRank = (destRank + 1) % numProcs) {
983
984 size_t curNumEntRead = 0; // output argument of below
985 if (destRank == srcRank) {
986 // We can read and process the triples directly. We don't
987 // need to use intermediate storage, because we don't need to
988 // pack and send the triples.
989 const int readErrCode =
990 Impl::readTriples<SC, GO> (inputStream, curLineNum, curNumEntRead,
992 errStrm, debug);
993 if (debug && errStrm != NULL) {
994 std::ostringstream os;
995 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
996 << ", GO=" << typeid (GO).name () << ": "
997 << "(dest=src) readTriples returned curNumEntRead="
998 << curNumEntRead << ", errCode=" << readErrCode << endl;
999 *errStrm << os.str ();
1000 }
1002 }
1003 else {
1004 if (false && debug && errStrm != NULL) {
1005 std::ostringstream os;
1006 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1007 << ", GO=" << typeid (GO).name () << ": "
1008 << "Calling readAndSend... with destRank=" << destRank << endl;
1009 *errStrm << os.str ();
1010 }
1011 // Read, pack, and send the triples to destRank.
1012 const int readAndSendErrCode =
1013 Impl::readAndSendOneBatchOfTriples<SC, GO> (inputStream, curLineNum,
1015 sizeBuf, msgBuf,
1018 comm, tolerant, errStrm,
1019 debug);
1021 if (debug && errStrm != NULL) {
1022 std::ostringstream os;
1023 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1024 << ", GO=" << typeid (GO).name () << ": "
1025 << "readAndSend... with destRank=" << destRank
1026 << " returned curNumEntRead=" << curNumEntRead
1027 << ", errCode=" << readAndSendErrCode << endl;
1028 *errStrm << os.str ();
1029 }
1031 if (readAndSendErrCode == 0 && curNumEntRead == 0) {
1033 if (debug && errStrm != NULL) {
1034 std::ostringstream os;
1035 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1036 << ", GO=" << typeid (GO).name () << ": "
1037 << "Last send to " << destRank << " with tag " << sizeTag
1038 << " was legit zero, counts as termination" << endl;
1039 *errStrm << os.str ();
1040 }
1041 }
1042 }
1043 } // loop around through processes until done reading file, or error
1044
1045 // Loop around through the remaining processes, and tell them that
1046 // we're done, by sending zero. If the last message we sent to
1047 // destRank was zero, then skip that process, since it only
1048 // expects one message of size zero. Note that destRank got
1049 // incremented mod numProcs at end of loop, so we have to
1050 // decrement it mod numProcs.
1051 destRank = (destRank - 1) % numProcs;
1052 if (destRank < 0) { // C mod operator does not promise positivity
1054 }
1055
1057 for (int outRank = startRank; outRank < numProcs; ++outRank) {
1058 if (outRank != srcRank) {
1059 if (debug && errStrm != NULL) {
1060 std::ostringstream os;
1061 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1062 << ", GO=" << typeid (GO).name () << ": "
1063 << "Post send (size, termination msg) to " << outRank
1064 << " with tag " << sizeTag << "(was last message legit zero? "
1065 << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1066 *errStrm << os.str ();
1067 }
1068 sizeBuf[0] = 0;
1069 ::Teuchos::send (sizeBuf.getRawPtr (), 1, outRank, sizeTag, comm);
1070 }
1071 }
1072 }
1073 else {
1074 while (true) {
1075 // Prepost a message to receive the size (in bytes) of the
1076 // incoming packet.
1077 sizeBuf[0] = 0; // superfluous, but safe
1078 if (debug && errStrm != NULL) {
1079 std::ostringstream os;
1080 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1081 << ", GO=" << typeid (GO).name () << ": "
1082 << "Post irecv (size) from " << srcRank
1083 << " with tag " << sizeTag << std::endl;
1084 *errStrm << os.str ();
1085 }
1086 auto sizeReq = ::Teuchos::ireceive (sizeBuf, srcRank, sizeTag, comm);
1087
1088 int numEnt = 0; // output argument
1089 const int recvErrCode =
1090 Impl::recvOneBatchOfTriples (rowInds, colInds, vals, numEnt, sizeBuf,
1091 msgBuf, sizeReq, srcRank, comm, tolerant,
1092 errStrm, debug);
1093 if (debug && errStrm != NULL) {
1094 std::ostringstream os;
1095 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1096 << ", GO=" << typeid (GO).name () << ": "
1097 << "recvOneBatchOfTriples returned numEnt=" << numEnt
1098 << ", errCode=" << recvErrCode << endl;
1099 *errStrm << os.str ();
1100 }
1102
1103 if (numEnt != static_cast<int> (rowInds.size ()) ||
1104 numEnt != static_cast<int> (colInds.size ()) ||
1105 numEnt != static_cast<int> (vals.size ())) {
1106 errCode = (errCode == 0) ? -1 : errCode;
1107 if (errStrm != NULL) {
1108 *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1109 << "numEnt = " << numEnt
1110 << ", rowInds.size() = " << rowInds.size ()
1111 << ", colInds.size() = " << colInds.size ()
1112 << ", vals.size() = " << vals.size () << "."
1113 << endl;
1114 }
1115 } // if sizes inconsistent
1116
1117 // Sending zero items is how Process srcRank tells this process
1118 // that it (Process srcRank) is done sending out data.
1119 if (numEnt == 0) {
1120 break;
1121 }
1122
1123 for (int k = 0; k < numEnt && errCode == 0; ++k) {
1124 const int curErrCode = processTriple (rowInds[k], colInds[k], vals[k]);
1125 errCode = (curErrCode == 0) ? errCode : curErrCode;
1126 }
1127 } // while we still get messages from srcRank
1128 }
1129
1130 if (debug && errStrm != NULL) {
1131 std::ostringstream os;
1132 os << "Proc " << comm.getRank () << ", SC=" << typeid (SC).name ()
1133 << ", GO=" << typeid (GO).name () << ": "
1134 << "Done with send/recv loop" << endl;
1135 *errStrm << os.str ();
1136 }
1137 // Do a bitwise OR to get an error code that is nonzero if and only
1138 // if any process' local error code is nonzero.
1139 using ::Teuchos::outArg;
1140 using ::Teuchos::REDUCE_BOR;
1141 using ::Teuchos::reduceAll;
1142 const int lclErrCode = errCode;
1144 return errCode;
1145}
1146
1147} // namespace Details
1148} // namespace Tpetra
1149
1150#endif // TPETRA_DETAILS_READTRIPLES_HPP
bool readComplexData(std::istream &istr, OrdinalType &rowIndex, OrdinalType &colIndex, RealType &realPart, RealType &imagPart, const std::size_t lineNumber, const bool tolerant)
Read "<rowIndex> <colIndex> <realPart> <imagPart>" from a line.
int readTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numTriplesRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumTriplesToRead, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most numTriplesToRead triples from the given Matrix Market input stream, and pass along any r...
int recvOneBatchOfTriples(std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, int &numEnt, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, CommRequestPtr &sizeReq, const int srcRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readAndSendOneBatchOfTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numEntRead, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, const std::size_t maxNumEntPerMsg, const int destRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Struct that holds views of the contents of a CrsMatrix.
Implementation details of Tpetra.
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries ("triples").
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Implementation of the readLine stand-alone function in this namespace (see below).
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...