/* $Id: ondiskmergeradapt.h 5149 2010-03-24 23:37:18Z abehm $ Copyright (C) 2007 by The Regents of the University of California Redistribution of this file is permitted under the terms of the BSD license Date: 09/06/2008 Author: Alexander Behm */ #ifndef _ondiskmergeradapt_h_ #define _ondiskmergeradapt_h_ #include "ondiskmergerabs.h" #include "divideskipmerger.h" #include "filtertree/gramlist.h" #include "filtertree/gramlistondisk.h" #include "filtertree/statsutil.h" #include "filtertree/ftindexerondisk.h" #include "util/misc.h" #include #include #include #include #define WILDCHAR 91 #define POSFILTER_NONE 0 #define POSFILTER_MISMATCH 1 #define POSFILTER_DP (1<<1) #define POSFILTER_ENDDP (1<<2) #define POSFILTER_SUBSTR (1<<3) #define POSFILTER_LENGTH (1<<4) // forward declarations, overview of classes and template specializations template class WeightedGramList; // duplicate inverted lists are grouped into one weighted gramlist, abbrev. wgl template class Candidate; // information about a candidate answer, abbrev. candi or c template class WglDesc; // weighted gram list descriptor, adds leafID to a weightedgramlist, abbrev. wgld template class WglDescSet; // set of wglds, abbrev. wgldSet template class HeapMergeElement; // type of element used on heap for heapMerge template class LeafContext; // info regarding the processing of one leaf, abbrev. lc template class OnDiskMergerAdapt; // main merger class // template specializations for index with positional information template<> class WeightedGramList >; template<> class Candidate >; template > class WeightedGramList { public: unsigned gramCode; // hashed q-gram unsigned weight; // how often the corresponding gram appears in the query GramListOnDisk* gramList; // pointer to the inverted list on disk }; template > class Candidate { public: unsigned id; // string id unsigned weight; // number of occurrences on all lists (we remove duplicate lists, so we actually accummulate weights) unsigned weightNeeded; // number of lists (weight) that candidate must be absent from to be pruned }; template class WglDesc { public: WeightedGramList* wgl; unsigned leafID; WglDesc(unsigned i, WeightedGramList* l) : wgl(l), leafID(i) {} }; template class WglDescSet { public: vector* > wglds; unsigned totalListSize; // sum of listSizes of all wgld in wglds WglDescSet() : totalListSize(0) {} static bool cmpBySize(const WglDescSet* a, const WglDescSet* b) { return a->totalListSize < b->totalListSize; } ~WglDescSet() { for(unsigned i = 0; i < wglds.size(); i++) delete wglds.at(i); } }; template class HeapMergeElement { public: typename InvList::elementType* element; unsigned listIndex; HeapMergeElement(typename InvList::elementType* el, unsigned li) : element(el), listIndex(li) {} bool operator==(const HeapMergeElement& e) const { return *element == *(e.element); } bool operator<(const HeapMergeElement& e) const { if(*element == *(e.element)) return listIndex < e.listIndex; return *element < *(e.element); } }; template class LeafContext { public: unsigned threshold; // merging threshold fstream* invListsFile; // pointer to file of inverted lists vector* >* wgls; // wgls ordered by size (globally ordered for seqMerge and locally ordered for regMerge) unsigned initialLists; // minimum number of lists to read to guarantee completeness of candidate set unsigned initialWeight; // total weight of initial lists unsigned remainingLists; // lists that we have not read yet unsigned remainingWeight; // total weight of remaining lists unsigned currentList; // index in wgls for which list to read next unsigned* candidateCounts; // total number of candidates that have a particular weight needed vector* > candidates; // current candidate set // helpers for cost model unsigned listCounter; // number of lists to be read from this leaf unsigned weightCounter; // total weight of listCounter lists unsigned weightedTotalListSize; // total list size of listCounter lists, multiplied by list weights unsigned cumulCandiCount; // number of candidates potentially pruned by reading listCounter lists // sortLeafLists indicates whether wgls should be sorted by leaf size, false for seqMerge, true for regMerge LeafContext(unsigned threshold, fstream& invListsFile, vector* >* wgls, bool sortLeafLists); // set initialLists to a specifit value, used in seqMerge only void setInitialLists(unsigned initialLists); static bool cmpWglBySize(const WeightedGramList* a, const WeightedGramList* b) { return a->gramList->listSize < b->gramList->listSize; } ~LeafContext() { if(candidateCounts) delete[] candidateCounts; } }; template LeafContext:: LeafContext(unsigned threshold, fstream& invListsFile, vector* >* wgls, bool sortLeafLists) : threshold(threshold), invListsFile(&invListsFile), wgls(wgls) { // init helpers for cost model weightCounter = 0; weightedTotalListSize = 0; listCounter = 0; cumulCandiCount = 0; // determine initialWeight unsigned totalWeight = 0; for(unsigned i = 0; i < wgls->size(); i++) totalWeight += wgls->at(i)->weight; initialWeight = totalWeight - threshold + 1; // determine the initial number of lists to read to reach initialWeight if(sortLeafLists) sort(wgls->begin(), wgls->end(), LeafContext::cmpWglBySize); initialLists = 0; remainingWeight = 0; unsigned weightSum = 0; for(unsigned i = 0; i < wgls->size(); i++) { unsigned currentWeight = wgls->at(i)->weight; weightSum += currentWeight; if(weightSum >= initialWeight) { if(initialLists == 0) initialLists = i + 1; else remainingWeight += currentWeight; } } currentList = initialLists; remainingLists = wgls->size() - initialLists; candidateCounts = new unsigned[remainingWeight+1]; memset(candidateCounts, 0, sizeof(unsigned) * (remainingWeight+1)); } template void LeafContext:: setInitialLists(unsigned initialLists) { if(initialLists <= this->initialLists) return; this->initialLists = initialLists; currentList = initialLists; remainingLists = wgls->size() - initialLists; // recompute initialWeight and remainingWeight initialWeight = 0; unsigned totalWeight = 0; for(unsigned i = 0; i < wgls->size(); i++) { if(i < initialLists) initialWeight += wgls->at(i)->weight; totalWeight += wgls->at(i)->weight; } remainingWeight = totalWeight - initialWeight; if(candidateCounts) delete[] candidateCounts; candidateCounts = new unsigned[remainingWeight+1]; memset(candidateCounts, 0, sizeof(unsigned) * (remainingWeight+1)); } // main merger class template > class OnDiskMergerAdapt : public OnDiskMergerAbs, InvList> { private: typedef OnDiskMergerAbs, InvList> SuperClass; // parameters for cost model bool estimationParamsSet; float readInvListTimeSlope; float readInvListTimeIntercept; float readStringAvgTime; // indicates which positional filters are to be used, see defines at the beginning of this file char posFilters; // exponential probe, returns iterator to first element greater or equal to value inline typename InvList::iterator expProbe(typename InvList::iterator start, typename InvList::iterator end, unsigned value) const; // remove candidates from candidate set with weightNeeded == 0 void cleanseCandidates(vector* >& candidates) const; // length filter, returns POSFILTER_LENGTH if pruned or POSFILTER_NONE if not pruned char checkLength(const Query& query, const string& prePostQueryStr, const Candidate& candidate) const; // entry point for merging by reading sublists of one gram sequentially void seqMerge(const Query& query, vector* >* allWgls, unsigned nLeaves, unsigned threshold, fstream& invListsFile, vector& results); // entry point for merging by processing leafs one-by-one (not reading sublists of one gram sequentially) void regMerge(const Query& query, vector* >* allWgls, unsigned nLeaves, unsigned threshold, fstream& invListsFile, vector& results); // create wgls from qgls, setting weights of wgls according to how often the same list appears in qgls void detectDuplicateLists(vector* >& qgls, vector* >& wgls) const; // read inverted lists, call heapMerge to create initial candidate set, cleanup inverted lists void mergeInitialLists(const Query& query, const string& prePostQueryStr, LeafContext& leafContext, vector& results) const; // merge initial lists to obtain initial candidate set void heapMerge(const Query& query, const string& prePostQueryStr, LeafContext& leafContext, vector& results) const; // estimate benefit of reading next lc.weightCounter lists float getEstimatedBenefit(LeafContext& lc, unsigned numberStrings) const; // read next listsToRead lists and process them to remove candidates void readNextLists(const Query& query, const string& prePostQueryStr, LeafContext& leafContext, unsigned listsToRead, vector& results) const; // add all candidates ids in lc->candidates to results void addRemainingCandidates(const Query& query, const string& prePostQueryStr, LeafContext* lc, vector& results) const; // read initial lists of all leaves from disk, all sublists belonging to the same gram are read sequentially unsigned seqReadInitialLists(vector* >& leafContexts, const vector*>& ordWgldSets, fstream& invListsFile) const; // for seqMerge the lists in all leaves should be globally (!) ordered according to the total size of all sublists belonging to one gram // this function reorders the wgls and produces a set of globally ordered gram-lists (ordWgldSets) so we can keep track of which (global) gramlist to read next void reorderWgls(vector*>* ordWgls, vector*>& ordWgldSets, const vector*>* allWgls, unsigned nLeaves) const; // only for indexes with positional information, update candidate set from one PosID on an inverted list void processPosID(const Query& query, const string& prePostQueryStr, unsigned gramCode, unsigned candiID, vector& wglPos, unsigned char candiPos, bool* positionsMatched, Candidate* c) const; public: // singleFilterOpt indicates whether we read lists belonging to the same gram sequentially OnDiskMergerAdapt(bool singleFilterOpt = true, char posFilters = POSFILTER_NONE) : SuperClass(singleFilterOpt), estimationParamsSet(false), posFilters(posFilters) {} // main entry point called from searcher void merge_Impl(const Query& query, vector* >* >& allQgls, const unsigned threshold, fstream& invListsFile, unsigned numberFilters, vector& results); // record how many strings are in each partition (leaf), used in cost model for determining the probability of pruning candidates vector numberStringsInLeaf; // for setting cost model parameters bool estimationParamsNeeded_Impl() { return !estimationParamsSet; } void setEstimationParams_Impl(float readInvListTimeSlope, float readInvListTimeIntercept, float readStringAvgTime) { this->readInvListTimeSlope = readInvListTimeSlope; this->readInvListTimeIntercept = readInvListTimeIntercept; this->readStringAvgTime = readStringAvgTime; estimationParamsSet = true; } // comparison function for sorting static bool cmpQglByGramCode(const QueryGramList* a, const QueryGramList* b) { return a->gramCode < b->gramCode; } // comparison function for sorting static bool cmpCandidateByWeightNeeded(const Candidate* a, const Candidate* b) { if(a->weightNeeded == 0) return false; if(b->weightNeeded == 0) return true; return a->id < b->id; } string getName() { return "OnDiskMergerAdapt"; } }; template void OnDiskMergerAdapt:: addRemainingCandidates(const Query& query, const string& prePostQueryStr, LeafContext* lc, vector& results) const { for(unsigned x = 0; x < lc->candidates.size(); x++) { results.push_back(lc->candidates.at(x)->id); delete lc->candidates.at(x); } } template char OnDiskMergerAdapt:: checkLength(const Query& query, const string& prePostQueryStr, const Candidate& candidate) const { signed l1 = prePostQueryStr.size(); signed l2 = candidate.partialStr.size(); if( abs(l1 - l2) > (signed)query.threshold ) return POSFILTER_LENGTH; else return POSFILTER_NONE; } template void OnDiskMergerAdapt:: cleanseCandidates(vector* >& candidates) const { sort(candidates.begin(), candidates.end(), OnDiskMergerAdapt::cmpCandidateByWeightNeeded); while(candidates.size() > 0 && candidates.back()->weightNeeded == 0) { delete candidates.back(); candidates.pop_back(); } } template typename InvList::iterator OnDiskMergerAdapt:: expProbe(typename InvList::iterator start, typename InvList::iterator end, unsigned value) const { unsigned c = 0; for(;;) { typename InvList::iterator iter = start + (1 << c); if(iter >= end) return end; else if(*iter >= value) return iter; c++; } } template void OnDiskMergerAdapt:: heapMerge(const Query& query, const string& prePostQueryStr, LeafContext& lc, vector& results) const { unsigned pointersIndex[lc.initialLists]; memset(pointersIndex, 0, lc.initialLists * sizeof(unsigned)); Heap > heap(lc.initialLists); // make initial heap for(unsigned i = 0; i < lc.initialLists; i++) { unsigned& e = lc.wgls->at(i)->gramList->getArray()->at(0); heap.push(new HeapMergeElement(&e, i)); } HeapMergeElement* poppedElements[lc.initialLists]; while( !heap.empty() ) { // Container of vector indexes which should be moved to the next position unsigned poppedElementCount = 0; HeapMergeElement* minData = heap.head(); Candidate* c = new Candidate(); c->id = *(minData->element); c->weight = 0; // pop elements with same id and accumulate weight in candidate while( *(minData) == *(heap.head()) && poppedElementCount < lc.initialLists) { poppedElements[poppedElementCount++] = heap.head(); c->weight += lc.wgls->at(heap.head()->listIndex)->weight; heap.pop(); if(heap.empty()) break; } // check if candidate-weight exceeds merge threshold and passes filters if(c->weight >= lc.threshold) { results.push_back(c->id); delete c; } else { c->weightNeeded = lc.remainingWeight - (lc.threshold - c->weight) + 1; if((signed)c->weightNeeded > 0) { if(this->charsumStats) { if(this->charsumFilter->passesFilter(this->queryCharsumStats, &this->charsumStats[c->id], (unsigned)query.threshold)) { lc.candidateCounts[c->weightNeeded]++; lc.candidates.push_back(c); } else delete c; } else { lc.candidateCounts[c->weightNeeded]++; lc.candidates.push_back(c); } } } // move to next element and insert to heaps for(unsigned i = 0; i < poppedElementCount; i++) { unsigned index = poppedElements[i]->listIndex; pointersIndex[index]++; unsigned position = pointersIndex[index]; if(position < lc.wgls->at(index)->gramList->getArray()->size()) { unsigned& e = lc.wgls->at(index)->gramList->getArray()->at(position); heap.push(new HeapMergeElement(&e, index)); } delete poppedElements[i]; } } } template float OnDiskMergerAdapt:: getEstimatedBenefit(LeafContext& lc, unsigned numberStrings) const { float avgListSize = (float)lc.weightedTotalListSize / (float)(lc.weightCounter); float psuccess = 1.0f - ( avgListSize / numberStrings ); // calculate the benefit of reading the next weightCounter lists float benefitReadLists = 0.0f; for(unsigned j = 1; j <= lc.weightCounter; j++) { if(lc.candidateCounts[j] > 0) { float p = binomialDistrib(j, lc.weightCounter, psuccess, true); benefitReadLists += lc.candidateCounts[j] * readStringAvgTime * p; } } return benefitReadLists; } template void OnDiskMergerAdapt:: readNextLists(const Query& query, const string& prePostQueryStr, LeafContext& leafContext, unsigned listsToRead, vector& results) const { for(unsigned i = 0; i < listsToRead; i++) { WeightedGramList* wgl = leafContext.wgls->at(leafContext.currentList+i); GramListOnDisk* gl = wgl->gramList; InvList* array = gl->getArray(leafContext.invListsFile); leafContext.remainingWeight -= wgl->weight; // do lookup for every candidate bool cleanseNeeded = false; typename InvList::iterator start = array->begin(); for(unsigned j = 0; j < leafContext.candidates.size(); j++) { Candidate* candidate = leafContext.candidates.at(j); if((signed)candidate->weightNeeded <= 0) continue; typename InvList::iterator end = expProbe(start, array->end(), candidate->id); start = lower_bound(start, end, candidate->id); if(*start == candidate->id) { candidate->weight += wgl->weight; if(candidate->weight >= leafContext.threshold) { leafContext.candidateCounts[candidate->weightNeeded]--; candidate->weightNeeded = 0; results.push_back(candidate->id); cleanseNeeded = true; } } else { leafContext.candidateCounts[candidate->weightNeeded]--; candidate->weightNeeded = (candidate->weightNeeded > wgl->weight) ? candidate->weightNeeded - wgl->weight : 0; if(candidate->weightNeeded > leafContext.remainingWeight) candidate->weightNeeded = 0; if(candidate->weightNeeded > 0) leafContext.candidateCounts[candidate->weightNeeded]++; else cleanseNeeded = true; } } // do we need to remove items from the candidate set // i.e. we can guarantee that some elements are either answers or cannot be answers if(cleanseNeeded) cleanseCandidates(leafContext.candidates); gl->clear(); } } template void OnDiskMergerAdapt:: mergeInitialLists(const Query& query, const string& prePostQueryStr, LeafContext& leafContext, vector& results) const { // read short lists from disk (or retrieve from cache if already read) for(unsigned i = 0; i < leafContext.initialLists; i++) leafContext.wgls->at(i)->gramList->getArray(leafContext.invListsFile); heapMerge(query, prePostQueryStr, leafContext, results); // clean up lists for(unsigned i = 0; i < leafContext.initialLists; i++) leafContext.wgls->at(i)->gramList->clear(); } template void OnDiskMergerAdapt:: detectDuplicateLists(vector* >& qgls, vector* >& wgls) const { sort(qgls.begin(), qgls.end(), OnDiskMergerAdapt::cmpQglByGramCode); unsigned i = 0; while(i < qgls.size()) { QueryGramList* currentQGL = qgls.at(i); unsigned currentCount = 0; while(i < qgls.size()) { if(currentQGL->gramCode == qgls.at(i)->gramCode) { currentCount++; i++; } else break; } WeightedGramList* wgl = new WeightedGramList(); wgl->gramCode = currentQGL->gramCode; wgl->gramList = dynamic_cast*>(currentQGL->gl); wgl->weight = currentCount; wgls.push_back(wgl); } } template void OnDiskMergerAdapt:: reorderWgls(vector*>* ordWgls, vector*>& ordWgldSets, const vector*>* allWgls, unsigned nLeaves) const { tr1::unordered_map* > groupedWgls; // fill the groupedWgls for(unsigned i = 0; i < nLeaves; i++) { const vector* >& currentLists = allWgls[i]; for(unsigned j = 0; j < currentLists.size(); j++) { WeightedGramList* wgl = currentLists.at(j); WglDescSet* wgldSet = NULL; if(groupedWgls.find(wgl->gramCode) == groupedWgls.end()) { wgldSet = new WglDescSet(); groupedWgls[wgl->gramCode] = wgldSet; ordWgldSets.push_back(wgldSet); } else wgldSet = groupedWgls[wgl->gramCode]; wgldSet->wglds.push_back( new WglDesc(i, wgl) ); wgldSet->totalListSize += wgl->gramList->listSize; } } // sort the vector of WglDescSet by total-sublist-size sort(ordWgldSets.begin(), ordWgldSets.end(), WglDescSet::cmpBySize); // reorder the gramlists for each leaf to be consistent with the global ordering (by the sum of the sublist sizes) for(unsigned i = 0; i < ordWgldSets.size(); i++) { WglDescSet* wgldSet = ordWgldSets.at(i); vector* >& wglds = wgldSet->wglds; for(unsigned j = 0; j < wglds.size(); j++) { ordWgls[wglds.at(j)->leafID].push_back(wglds.at(j)->wgl); } } } template unsigned OnDiskMergerAdapt:: seqReadInitialLists(vector* >& leafContexts, const vector*>& ordWgldSets, fstream& invListsFile) const { // we must keep reading all sorted sublists belonging to a gram // as long as there exist one leaf for which we have not read initialLists number of lists bool done = false; unsigned globalCurrentList = 0; unsigned nLeaves = leafContexts.size(); while(!done) { WglDescSet* wgldSet = ordWgldSets.at(globalCurrentList); vector*>& wglds = wgldSet->wglds; // seek to start offset of first sorted sublist WglDesc* firstWgld = *(wglds.begin()); GramListOnDisk* firstGl = firstWgld->wgl->gramList; invListsFile.seekg( firstGl->startOffset ); // now fill sorted sublists in order in one sequential I/O // fillArray is implemented NOT to perform a disk seek typename vector* >::iterator setiter; for(setiter = wglds.begin(); setiter != wglds.end(); setiter++) { WglDesc* wgld = *setiter; GramListOnDisk* gl = wgld->wgl->gramList; // sanity check, in some cases this condition could be true, it is NOT an error if(invListsFile.tellg() != gl->startOffset) invListsFile.seekg(gl->startOffset); gl->fillArray(&invListsFile); leafContexts.at(wgld->leafID)->listCounter++; } // check whether we need to read more lists unsigned doneCount = 0; for(unsigned i = 0; i < nLeaves; i++) if(leafContexts.at(i)->listCounter >= leafContexts.at(i)->initialLists) doneCount++; done = doneCount >= nLeaves; globalCurrentList++; } return globalCurrentList; } template void OnDiskMergerAdapt:: seqMerge(const Query& query, vector* >* allWgls, unsigned nLeaves, unsigned threshold, fstream& invListsFile, vector& results) { // only needed for indexer with positional information string prePostQueryStr; if(posFilters != POSFILTER_NONE) this->gramGen->getPrePostString(query.str, prePostQueryStr); // reorder the inverted lists according to a global ordering, fill ordWgldSets and ordWgls vector* > ordWgls[nLeaves]; vector*> ordWgldSets; reorderWgls(ordWgls, ordWgldSets, allWgls, nLeaves); // initialize the context for all leaves vector* > leafContexts; for(unsigned i = 0; i < nLeaves; i++) { LeafContext* lc = new LeafContext(threshold, invListsFile, &ordWgls[i], false); leafContexts.push_back(lc); } // read initial lists for all leaves, read sublists belonging to one gram sequentially unsigned globalCurrentList = seqReadInitialLists(leafContexts, ordWgldSets, invListsFile); #ifdef DEBUG_STAT this->numberSeeks = globalCurrentList; #endif // perform initial merging for all leaves for(unsigned i = 0; i < nLeaves; i++) { LeafContext* lc = leafContexts.at(i); lc->setInitialLists(lc->listCounter); mergeInitialLists(query, prePostQueryStr, *lc, results); } // start cost-based selection of additional lists to read unsigned listsToRead = 1; // number of lists we decide to read in every iteration unsigned lastListCounter = ordWgldSets.size(); unsigned lastListCounterRepeats = 0; unsigned lastCandidatesLeft = 0; while(listsToRead && globalCurrentList < ordWgldSets.size() ) { listsToRead = 0; // reset cost estimation helpers in all leafContexts for(unsigned i = 0; i < nLeaves; i++) { leafContexts.at(i)->listCounter = 0; leafContexts.at(i)->weightCounter = 0; leafContexts.at(i)->weightedTotalListSize = 0; leafContexts.at(i)->cumulCandiCount = 0; } // estimate benefit and cost for reading any number of remaining lists unsigned totalListSize = 0; unsigned listCounter = 0; for(unsigned i = globalCurrentList; i < ordWgldSets.size(); i++) { listCounter++; WglDescSet* wgldSet = ordWgldSets.at(i); vector*>& wglds = wgldSet->wglds; // update cost estimation helpers typename vector*>::iterator setiter; for(setiter = wglds.begin(); setiter != wglds.end(); setiter++) { WglDesc* wgld = *setiter; unsigned leafID = wgld->leafID; LeafContext* lc = leafContexts.at(leafID); WeightedGramList* wgl = lc->wgls->at(lc->currentList + lc->listCounter); unsigned newWeightCounter = lc->weightCounter + wgl->weight; for(unsigned j = lc->weightCounter+1; j <= newWeightCounter; j++) lc->cumulCandiCount += lc->candidateCounts[j]; lc->weightCounter = newWeightCounter; lc->weightedTotalListSize += wgl->gramList->listSize * wgl->weight; lc->listCounter++; } // determine benefit of reading lists and cost of not reading lists float benefitReadLists = 0.0f; float costNoReadLists = 0.0f; unsigned candidatesLeft = 0; for(unsigned j = 0; j < nLeaves; j++) { candidatesLeft += leafContexts.at(j)->candidates.size(); if(leafContexts.at(j)->weightCounter > 0) benefitReadLists += getEstimatedBenefit(*leafContexts.at(j), numberStringsInLeaf.at(j)); if(leafContexts.at(j)->remainingWeight > 0 && leafContexts.at(j)->candidates.size() > 0) costNoReadLists += leafContexts.at(j)->cumulCandiCount * readStringAvgTime; } totalListSize += wgldSet->totalListSize; float costReadLists = (listCounter * readInvListTimeIntercept + totalListSize * readInvListTimeSlope) - benefitReadLists; if(costReadLists < costNoReadLists) { // first heuristic to terminate reading lists if(candidatesLeft <= nLeaves) break; // second heuristic to terminate reading lists if(listCounter > 1 && listCounter >= lastListCounter && lastCandidatesLeft == candidatesLeft) { lastListCounterRepeats++; if(lastListCounterRepeats > listCounter) break; } else { lastCandidatesLeft = candidatesLeft; lastListCounter = listCounter; lastListCounterRepeats = 0; } listsToRead = 1; break; } } // read the next lists using sequential IOs for(unsigned i = 0; i < listsToRead; i++) { WglDescSet* wgldSet = ordWgldSets.at(globalCurrentList+i); vector*>& wglds = wgldSet->wglds; // now fill sorted sublists in order in one sequential I/O // fillArray is implemented NOT to perform a disk seek typename vector*>::iterator setiter; for(setiter = wglds.begin(); setiter != wglds.end(); setiter++) { WglDesc* wgld = *setiter; if(leafContexts.at(wgld->leafID)->candidates.size() > 0) { GramListOnDisk* gl = wgld->wgl->gramList; // handle first seek and sanity check in this manner if(invListsFile.tellg() != gl->startOffset) invListsFile.seekg(gl->startOffset); gl->fillArray(&invListsFile); } } // now process the read sublists for(setiter = wglds.begin(); setiter != wglds.end(); setiter++) { WglDesc* wgld = *setiter; if(leafContexts.at(wgld->leafID)->candidates.size() > 0) { LeafContext* lc = leafContexts.at(wgld->leafID); readNextLists(query, prePostQueryStr, *lc, 1, results); lc->currentList++; } } } // if listsToRead is 0, it means we cannot get a benefit by reading any number of next lists globalCurrentList += listsToRead; #ifdef DEBUG_STAT this->numberSeeks += listsToRead; #endif } // add remaining candidates to result set and delete candidate instances for(unsigned i = 0; i < nLeaves; i++) addRemainingCandidates(query, prePostQueryStr, leafContexts.at(i), results); // cleanup for(unsigned i = 0; i < ordWgldSets.size(); i++) delete ordWgldSets.at(i); for(unsigned i = 0; i < leafContexts.size(); i++) delete leafContexts.at(i); } template void OnDiskMergerAdapt:: regMerge(const Query& query, vector* >* allWgls, unsigned nLeaves, unsigned threshold, fstream& invListsFile, vector& results) { // only needed for indexer with positional information string prePostQueryStr; if(posFilters != POSFILTER_NONE) this->gramGen->getPrePostString(query.str, prePostQueryStr); // search every leaf node for(unsigned i = 0; i < nLeaves; i++) { // initialize the context for this leaf LeafContext lc(threshold, invListsFile, &allWgls[i], true); // merge the initial short lists mergeInitialLists(query, prePostQueryStr, lc, results); #ifdef DEBUG_STAT this->numberSeeks += lc.initialLists; #endif unsigned listsToRead = 1; // number of lists we decide to read in every iteration while(listsToRead && lc.currentList < lc.wgls->size()) { listsToRead = 0; // for every possible number of remaining lists determine the benefits and cost unsigned totalListSize = 0; unsigned listCounter = 0; unsigned lastListCounter = lc.wgls->size(); unsigned lastListCounterRepeats = 0; unsigned lastCandidatesLeft = 0; for(unsigned x = lc.currentList; x < lc.wgls->size(); x++) { listCounter++; WeightedGramList* wgl = lc.wgls->at(x); unsigned newWeightCounter = lc.weightCounter + wgl->weight; for(unsigned j = lc.weightCounter+1; j <= newWeightCounter; j++) lc.cumulCandiCount += lc.candidateCounts[j]; lc.weightCounter = newWeightCounter; lc.weightedTotalListSize += wgl->gramList->listSize * wgl->weight; totalListSize += wgl->gramList->listSize; float benefitReadLists = getEstimatedBenefit(lc, numberStringsInLeaf.at(i)); float costNoReadLists = lc.cumulCandiCount * readStringAvgTime; float costReadLists = (listCounter * readInvListTimeIntercept + lc.weightedTotalListSize * readInvListTimeSlope) - benefitReadLists; if(costReadLists < costNoReadLists) { // first heuristic to terminate reading lists if(lc.candidates.size() <= 1) break; // second heuristic to terminate reading lists if(listCounter > 1 && listCounter >= lastListCounter && lastCandidatesLeft == lc.candidates.size()) { lastListCounterRepeats++; if(lastListCounterRepeats > listCounter) break; } else { lastCandidatesLeft = lc.candidates.size(); lastListCounter = listCounter; lastListCounterRepeats = 0; } listsToRead = 1; break; } } if(listsToRead) readNextLists(query, prePostQueryStr, lc, listsToRead, results); // if listsToRead is 0, it means we cannot get a benefit by reading any number of next lists lc.currentList += listsToRead; #ifdef DEBUG_STAT this->numberSeeks += listsToRead; #endif //cout << "----------------------------------------------" << endl; } // add remaining candidates to result set and clear candidate information addRemainingCandidates(query, prePostQueryStr, &lc, results); } } template void OnDiskMergerAdapt:: merge_Impl(const Query& query, vector* >* >& allQgls, unsigned threshold, fstream& invListsFile, unsigned numberFilters, vector& results) { unsigned nLeaves = allQgls.size(); // eliminate duplicate inverted lists, and assign weights to lists indicating how many its gram appears in the query vector* > allWgls[nLeaves]; for(unsigned i = 0; i < nLeaves; i++) detectDuplicateLists(*allQgls.at(i), allWgls[i]); // check for single filter optimization, reading lists sequentially if(numberFilters == 1 && this->singleFilterOpt && allQgls.size() > 1) seqMerge(query, allWgls, nLeaves, threshold, invListsFile, results); else // leaves are processed ony-by-one, lists of the same gram are NOT read sequantially (if there is partitioning) regMerge(query, allWgls, nLeaves, threshold, invListsFile, results); // clean up weighted gramlists for(unsigned i = 0; i < nLeaves; i++) for(unsigned j = 0; j < allWgls[i].size(); j++) delete allWgls[i].at(j); } // specialize the weighted gramlist for the inverted list type, if we have positions we also record them template<> class WeightedGramList > { public: unsigned gramCode; unsigned weight; GramListOnDisk >* gramList; vector positions; }; // specialize the candidate for the inverted list type, if we have positions then we also record the positions of mismatching grams #pragma pack(push,1) template<> class Candidate > { public: unsigned id; unsigned weight; unsigned weightNeeded; set mismatchPositions; // positions of mismatched grams from queries perspective char isPruned; // state of this candidate, can be pruned by various filters, 0 denotes not pruned, !0 denotes pruned string partialStr; Candidate(unsigned length, unsigned char wildchar = WILDCHAR) : isPruned(POSFILTER_NONE), partialStr(string(length, wildchar)) { } void updatePartialString(const string& gram, unsigned char pos) { // make sure there was no collision if( gram.size() > 0 ) for(unsigned i = 0; i < gram.size(); i++) partialStr[i+pos] = gram[i]; } float wildcardEd(const string& s1, const string& s2, const char wildchar = WILDCHAR); float wildcardEd(const char* s1, const char* s2, unsigned n, unsigned m, const char wildchar = WILDCHAR); // a bunch of filters for the edit distance that use positional information void checkMismatches(const Query& query, unsigned gramLength); void checkPartialString(const Query& query, const string& prePostQueryStr); void checkSubstrFilter(const Query& query, const string& prePostQueryStr, const string& gram, unsigned gramLength, unsigned stringLength, unsigned char stringPos, unsigned char queryPos, unsigned substrFilterCount); void applyPosFilters(); }; #pragma pack(pop) // declarations for member-function specializations follow (definitions are in .cc) template<> void OnDiskMergerAdapt >:: detectDuplicateLists(vector >* >& allQgls, vector >* >& allWgls) const; template<> void OnDiskMergerAdapt >:: readNextLists(const Query& query, const string& prePostQueryStr, LeafContext >& leafContext, unsigned listsToRead, vector& results) const; // WARNING: this will currently only work for edit distance, other similarity functions will NOT work template<> void OnDiskMergerAdapt >:: heapMerge(const Query& query, const string& prePostQueryStr, LeafContext >& leafContext, vector& results) const; template<> Array::iterator OnDiskMergerAdapt >:: expProbe(Array::iterator start, Array::iterator end, unsigned value) const; template<> void OnDiskMergerAdapt >:: addRemainingCandidates(const Query& query, const string& prePostQueryStr, LeafContext >* lc, vector& results) const; template<> void OnDiskMergerAdapt >:: processPosID(const Query& query, const string& prePostQueryStr, unsigned gramCode, unsigned candiID, vector& wglPos, unsigned char candiPos, bool* positionsMatched, Candidate >* c) const; #endif