You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
If an option --ordering-latency <T> with positive value T (in ms) is provided than every
incoming object will be buffered and uploaded only if no object with the same CCDB path
and earlier start of validity was received in preceding T ms.
All remaining cached objects are uploaded at EOR (or stop() method call).
then the `ObjA` will be uploaded only to the default server (`http://alice-ccdb.cern.ch`), `ObjB` will be uploaded to both default and `local` server and
162
162
`ObjC` will be uploaded to the `local` server only.
163
163
164
-
By default the ccdb-populator-workflow will not produce `fatal` on failed upload. To require it an option `--fatal-on-failure` can be used.
164
+
By default the `ccdb-populator-workflow` will not produce `fatal` on failed upload. To require it an option `--fatal-on-failure` can be used.
165
+
166
+
By default the `ccdb-populator-workflow` uploads objects as it gets them. In case there is a danger that objects of the same URL will arrive in the order not sorted in SOV
167
+
(which may lead to screaning of the object with later SOV by other object if earlier ROF) one can use an option `--ordering-latency <N milliseconds>` of the `ccdb-populator-workflow`.
168
+
Then every incoming object will be buffered and uploaded only if no object with the same CCDB path and earlier start of validity was received in preceding N milliseconds. All remaining cached objects are uploaded at EOR (or stop() method call).
if (runNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
100
-
metadata = *md; // clone since the md from the message is const
LOGP(error, "failed to bufferize a {} object with SOV={}/EOV={} received at {}, conflicting with previously bufferized one SOV={}/EOV={} received at {}",
LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
125
-
} else {
126
-
LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
127
-
}
128
-
}
129
-
// do we need to override previous object?
130
-
if (wrp->isAdjustableEOV() && !mAPI.isSnapshotMode()) {
131
-
o2::ccdb::adjustOverriddenEOV(mAPI, *wrp.get());
154
+
voidCCDBPopulator::checkCache(long delay)
155
+
{
156
+
// check if some entries in cache are ripe enough to upload
157
+
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
158
+
for (auto& pathCache : mOrdCache) {// loop over paths
159
+
if (delay < 0 && pathCache.second.size()) {
160
+
LOGP(important, "Uploading {} cached objects for path {}", pathCache.second.size(), pathCache.first);
161
+
}
162
+
for (auto it = pathCache.second.begin(); it != pathCache.second.end();) { // loop over objects of the path
LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
143
-
} else {
144
-
LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
145
-
}
146
-
} else {
147
-
LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
std::string msg = fmt::format("Storing in ccdb {}{}/{} of size {} valid for {} : {}", cached ? "cached " : "", wrp.getPath(), wrp.getFileName(), pld.size(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
176
+
auto uploadTS = o2::ccdb::getCurrentTimestamp();
177
+
logAsNeeded(uploadTS, wrp.getPath(), msg);
178
+
std::map<std::string, std::string> metadata;
179
+
constauto* md = &wrp.getMetaData();
180
+
if (mRunNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
181
+
metadata = *md; // clone since the md from the message is const
int res = mAPI.storeAsBinaryFile(&pld[0], pld.size(), wrp.getFileName(), wrp.getObjectType(), wrp.getPath(), *md, wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
186
+
if (res) {
187
+
if (mFatalOnFailure) {
188
+
LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
189
+
} else {
190
+
LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
191
+
}
192
+
}
193
+
// if requested, make sure that the new object can be queried
LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
203
+
} else {
204
+
LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
149
205
}
206
+
} else {
207
+
LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
@@ -186,6 +269,7 @@ DataProcessorSpec getCCDBPopulatorDeviceSpec(const std::string& defCCDB, const s
186
269
{"ccdb-path", VariantType::String, defCCDB, {"Path to CCDB"}},
187
270
{"sspec-min", VariantType::Int64, -1L, {"min subspec to accept"}},
188
271
{"sspec-max", VariantType::Int64, -1L, {"max subspec to accept"}},
272
+
{"ordering-latency", VariantType::Int, -1, {"if enabled (positive) bufferize object and upload it if no object with smaller SOV received in given waiting time (ms)"}},
189
273
{"throttling-delay", VariantType::Int64, 300000L, {"produce important type log at most once per this period in ms for each CCDB path"}},
190
274
{"validate-upload", VariantType::Bool, false, {"valider upload by querying its headers"}},
191
275
{"fatal-on-failure", VariantType::Bool, false, {"do not produce fatal on failed upload"}}}};
0 commit comments