@@ -932,7 +932,7 @@ func (qjm *XController) ScheduleNext() {
932932 // the appwrapper from being added in syncjob
933933 defer qjm .schedulingAWAtomicSet (nil )
934934
935- scheduleNextRetrier := retrier .New (retrier .ExponentialBackoff (10 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
935+ scheduleNextRetrier := retrier .New (retrier .ExponentialBackoff (1 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
936936 scheduleNextRetrier .SetJitter (0.05 )
937937 // Retry the execution
938938 err = scheduleNextRetrier .Run (func () error {
@@ -1018,7 +1018,8 @@ func (qjm *XController) ScheduleNext() {
10181018 retryErr = qjm .updateStatusInEtcd (ctx , qj , "ScheduleNext - setHOL" )
10191019 if retryErr != nil {
10201020 if apierrors .IsConflict (retryErr ) {
1021- klog .Warningf ("[ScheduleNext] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v. Retrying update." , qj .Namespace , qj .Name , qj .Status )
1021+ klog .Warningf ("[ScheduleNext] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v this may be due to appwrapper deletion." , qj .Namespace , qj .Name , qj .Status )
1022+ return nil
10221023 } else {
10231024 klog .Errorf ("[ScheduleNext] Failed to updated status in etcd for app wrapper '%s/%s', status = %+v, err=%v" , qj .Namespace , qj .Name , qj .Status , retryErr )
10241025 }
@@ -1068,6 +1069,8 @@ func (qjm *XController) ScheduleNext() {
10681069 }
10691070 return retryErr
10701071 }
1072+ //Remove stale copy
1073+ qjm .eventQueue .Delete (qj )
10711074 if err00 := qjm .eventQueue .Add (qj ); err00 != nil { // unsuccessful add to eventQueue, add back to activeQ
10721075 klog .Errorf ("[ScheduleNext] [Dispatcher Mode] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v" , qj .Name , qj , qj .ResourceVersion , qj .Status , err )
10731076 qjm .qjqueue .MoveToActiveQueueIfExists (qj )
@@ -1224,6 +1227,8 @@ func (qjm *XController) ScheduleNext() {
12241227 }
12251228 tempAW .DeepCopyInto (qj )
12261229 // add to eventQueue for dispatching to Etcd
1230+ // Remove stale copy
1231+ qjm .eventQueue .Delete (qj )
12271232 if err00 := qjm .eventQueue .Add (qj ); err00 != nil { // unsuccessful add to eventQueue, add back to activeQ
12281233 klog .Errorf ("[ScheduleNext] [Agent Mode] Failed to add '%s/%s' to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v" , qj .Namespace ,
12291234 qj .Name , qj , qj .ResourceVersion , qj .Status , err )
@@ -1587,7 +1592,11 @@ func (cc *XController) deleteQueueJob(obj interface{}) {
15871592 accessor .SetDeletionTimestamp (& current_ts )
15881593 }
15891594 klog .V (3 ).Infof ("[Informer-deleteQJ] %s enqueue deletion, deletion ts = %v" , qj .Name , qj .GetDeletionTimestamp ())
1590- cc .enqueue (qj )
1595+ //Remove stale copy
1596+ cc .eventQueue .Delete (qj )
1597+ cc .qjqueue .Delete (qj )
1598+ //Add fresh copy
1599+ cc .eventQueue .Add (qj )
15911600}
15921601
15931602func (cc * XController ) enqueue (obj interface {}) error {
@@ -2146,6 +2155,7 @@ func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper
21462155}
21472156func (cc * XController ) getAppWrapper (namespace string , name string , caller string ) (* arbv1.AppWrapper , error ) {
21482157 klog .V (5 ).Infof ("[getAppWrapper] getting a copy of '%s/%s' when called by '%s'." , namespace , name , caller )
2158+
21492159 apiCacheAWJob , err := cc .appWrapperLister .AppWrappers (namespace ).Get (name )
21502160 if err != nil {
21512161 if ! apierrors .IsNotFound (err ) {
0 commit comments